14#include "arccore/message_passing_mpi/MpiSerializeMessageList.h"
15#include "arccore/message_passing_mpi/MpiSerializeDispatcher.h"
16#include "arccore/message_passing_mpi/MpiAdapter.h"
17#include "arccore/message_passing/BasicSerializeMessage.h"
18#include "arccore/trace/ITraceMng.h"
19#include "arccore/base/FatalErrorException.h"
20#include "arccore/base/TimeoutException.h"
21#include "arccore/base/NotSupportedException.h"
28namespace Arccore::MessagePassing::Mpi
40 return _SortMessages::compare(m1,m2);
45 return compare(pm1,pm2);
59 if (dest_p1==dest_p2){
64 if (orig_p1==orig_p2 && (p1_tag!=p2_tag))
65 return p1_tag < p2_tag;
70 if (orig_p1==orig_p2 && (p1_tag!=p2_tag))
71 return p1_tag < p2_tag;
77 if (dest_p1 < dest_p2)
89MpiSerializeMessageList::
91: m_dispatcher(dispatcher)
92, m_adapter(dispatcher->adapter())
93, m_trace(m_adapter->traceMng())
94, m_message_passing_phase(timeMetricPhaseMessagePassing(m_adapter->timeMetricCollector()))
105 ARCCORE_FATAL(
"Can not convert 'ISerializeMessage' to 'BasicSerializeMessage'");
107 ARCCORE_FATAL(
"Invalid source '{0}' for send message (expected={1})",
109 m_messages_to_process.
add(true_message);
134 const bool print_sorted =
false;
136 for(
Integer i=0, is=m_messages_to_process.
size(); i<is; ++i ){
138 msg->
debug() <<
"Sorted message " << i
139 <<
" orig=" << pmsg->
source()
141 <<
" tag=" << pmsg->internalTag()
142 <<
" send?=" << pmsg->
isSend();
146 Int64 serialize_buffer_size = m_dispatcher->serializeBufferSize();
147 for(
Integer i=0; i<nb_message; ++i ){
159 const bool do_old =
false;
161 if (is_one_message_strategy)
163 new_request = m_dispatcher->legacySendSerializer(pmsg->
serializer(),{dest,tag,NonBlocking});
170 sbuf->preallocate(serialize_buffer_size);
171 MessageId message_id = pmsg->_internalMessageId();
172 if (message_id.isValid()){
178 if (is_one_message_strategy)
180 new_request = m_dispatcher->_recvSerializerBytes(sbuf->globalBuffer(),message_id,
false);
183 new_request = m_dispatcher->_recvSerializerBytes(sbuf->globalBuffer(),dest,tag,
false);
185 mpi_msg->setIsProcessed(
true);
189 m_messages_to_process.
clear();
199 Integer n = _waitMessages(wait_type);
200 m_dispatcher->checkFinishedSubRequests();
207Integer MpiSerializeMessageList::
211 if (wait_type==WaitAll){
212 while (_waitMessages2(
WaitSome)!=(-1))
216 return _waitMessages2(wait_type);
222Integer MpiSerializeMessageList::
225 Integer nb_message_finished = 0;
227 Integer nb_message = m_messages_request.size();
232 done_indexes.fill(
false);
236 for(
Integer z=0; z<nb_message; ++z ){
237 requests[z] = m_messages_request[z].m_request;
241 msg->
info() <<
"Waiting for rank =" << comm_rank <<
" nb_message=" << nb_message;
243 for(
Integer z=0; z<nb_message; ++z ){
244 BasicSerializeMessage* msm = m_messages_request[z].m_mpi_message;
245 msg->
info() <<
"Waiting for message: "
246 <<
" rank=" << comm_rank
247 <<
" issend=" << msm->isSend()
248 <<
" dest=" << msm->destination()
249 <<
" tag=" << msm->internalTag()
250 <<
" request=" << requests[z];
254 mpi_status.resize(nb_message);
255 MpiAdapter* adapter = m_adapter;
259 ARCCORE_FATAL(
"Bad value WaitAll");
261 msg->
debug() <<
" rank=" << comm_rank <<
"Wait some " << nb_message;
263 adapter->waitSomeRequestsMPI(requests,done_indexes,mpi_status,
false);
266 msg->
debug() <<
" rank=" << comm_rank <<
"Wait some non blocking " << nb_message;
268 adapter->waitSomeRequestsMPI(requests,done_indexes,mpi_status,
true);
272 catch(
const TimeoutException&){
273 std::ostringstream ostr;
274 for(
Integer z=0; z<nb_message; ++z ){
275 BasicSerializeMessage* message = m_messages_request[z].m_mpi_message;
276 ostr <<
"IndexReturn message: "
277 <<
" issend=" << message->isSend()
278 <<
" dest=" << message->destination()
279 <<
" done_index=" << done_indexes[z]
280 <<
" status_src=" << mpi_status[z].MPI_SOURCE
281 <<
" status_tag=" << mpi_status[z].MPI_TAG
282 <<
" status_err=" << mpi_status[z].MPI_ERROR
283 <<
" request=" << requests[z]
286 msg->
pinfo() <<
"Info messages: myrank=" << comm_rank <<
" " << ostr.str();
290 for(
Integer z=0; z<nb_message; ++z ){
291 BasicSerializeMessage* message = m_messages_request[z].m_mpi_message;
292 bool is_send = message->isSend();
293 MessageRank destination = message->destination();
294 Int64 message_size = message->trueSerializer()->totalSize();
296 msg->
info() <<
"IndexReturn message: Send: "
297 <<
" dest=" << destination
298 <<
" size=" << message_size
299 <<
" done_index=" << done_indexes[z]
300 <<
" request=" << requests[z];
302 msg->
info() <<
"IndexReturn message: Recv: "
303 <<
" dest=" << destination
304 <<
" size=" << message_size
305 <<
" done_index=" << done_indexes[z]
306 <<
" request=" << requests[z]
307 <<
" status_src=" << mpi_status[z].MPI_SOURCE
308 <<
" status_tag=" << mpi_status[z].MPI_TAG
309 <<
" status_err=" << mpi_status[z].MPI_ERROR;
313 UniqueArray<MpiSerializeMessageRequest> new_messages;
315 int mpi_status_index = 0;
316 for(
Integer i=0; i<nb_message; ++i ){
317 BasicSerializeMessage* mpi_msg = m_messages_request[i].m_mpi_message;
318 if (done_indexes[i]){
319 MPI_Status status = mpi_status[mpi_status_index];
320 Request rq = requests[i];
326 MessageRank source(status.MPI_SOURCE);
327 MessageTag tag(status.MPI_TAG);
329 msg->
info() <<
"Message number " << i <<
" Finished, source=" << source
331 <<
" err=" << status.MPI_ERROR
332 <<
" is_send=" << mpi_msg->isSend()
333 <<
" request=" << rq;
339 msg->
info() <<
"Add new receive operation for message number " << i
341 new_messages.add(MpiSerializeMessageRequest(mpi_msg,r));
344 mpi_msg->setFinished(
true);
345 ++nb_message_finished;
350 msg->
info() <<
"Message number " << i <<
" not finished"
351 <<
" request=" << requests[i];
352 new_messages.add(MpiSerializeMessageRequest(mpi_msg,requests[i]));
356 m_messages_request = new_messages;
357 if (m_messages_request.empty())
359 return nb_message_finished;
372 m_trace->
info() <<
"Process one message msg=" <<
this
373 <<
" number=" << message->messageNumber()
374 <<
" is_send=" << message->
isSend();
390 Int64 message_size = sbuf->totalSize();
393 if (dest_rank.
isNull() && !m_adapter->isAllowNullRankForAnySource())
394 ARCCORE_FATAL(
"Can not use MPI_Mprobe with null rank. Use MessageRank::anySourceRank() instead");
401 m_trace->
info() <<
"Process one message (GlobalBuffer) msg=" <<
this
402 <<
" number=" << message->messageNumber()
403 <<
" is_send=" << message->
isSend()
404 <<
" dest_rank=" << dest_rank
405 <<
" size=" << message_size
406 <<
" (buf_size=" << m_dispatcher->serializeBufferSize() <<
")";
412 if (message->messageNumber()==0){
413 if (message_size<=m_dispatcher->serializeBufferSize()
415 sbuf->setFromSizes();
418 m_dispatcher->_checkBigMessage(message_size);
419 sbuf->preallocate(message_size);
421 MessageTag next_tag = MpiSerializeDispatcher::nextSerializeTag(mpi_tag);
422 request = m_dispatcher->_recvSerializerBytes(bytes,dest_rank,next_tag,
false);
423 message->setMessageNumber(1);
426 sbuf->setFromSizes();
437 auto x = BasicSerializeMessage::create(source,destination,type);
Integer size() const
Nombre d'éléments du vecteur.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
void clear()
Supprime les éléments du tableau.
Interface du gestionnaire de traces.
virtual TraceMessageDbg debug(Trace::eDebugLevel=Trace::Medium)=0
Flot pour un message de debug.
virtual TraceMessage pinfo()=0
Flot pour un message d'information parallèle.
virtual Int32 verbosityLevel() const =0
Niveau de verbosité des messages.
virtual TraceMessage info()=0
Flot pour un message d'information.
virtual void flush()=0
Flush tous les flots.
virtual ISerializer * serializer()=0
Sérialiseur.
virtual eStrategy strategy() const =0
Stratégie utilisée pour les envois/réceptions.
virtual MessageRank destination() const =0
Rang du destinataire (si isSend() est vrai) ou de l'envoyeur.
virtual bool isSend() const =0
true s'il faut envoyer, false s'il faut recevoir
virtual MessageRank source() const =0
Rang de l'envoyeur du message.
@ OneMessage
Stratégie utilisant un seul message si possible.
MessageSourceInfo sourceInfo() const
Informations sur la source du message;.
bool isAnySource() const
Vrai si rang correspondant à anySourceRank()
bool isNull() const
Vrai si rang non initialisé correspondant au rang par défaut.
Int64 size() const
Taille du message.
int commRank() const
Rang de cette instance dans le communicateur.
Request sendSerializer(const ISerializer *s, const PointToPointMessageInfo &message) override
Message d'envoi.
void processPendingMessages() override
Envoie les messages de la liste qui ne l'ont pas encore été.
Request _processOneMessage(BasicSerializeMessage *msm, MessageRank source, MessageTag mpi_tag)
Effectue la requête. Retourne une éventuelle requête si non nul.
void addMessage(ISerializeMessage *msg) override
Ajoute un message à la liste.
Integer waitMessages(eWaitType wait_type) override
Attend que les messages aient terminé leur exécution.
Request _processOneMessageGlobalBuffer(BasicSerializeMessage *msm, MessageRank source, MessageTag mpi_tag)
Effectue la requête. Retourne une éventuelle requête si non nul.
Ref< ISerializeMessage > createAndAddMessage(MessageRank destination, ePointToPointMessageType type) override
Créé et ajoute un message de sérialisation.
MessageRank destination() const override
Rang du destinataire (si isSend() est vrai) ou de l'envoyeur.
eStrategy strategy() const override
Stratégie utilisée pour les envois/réceptions.
bool isSend() const override
true s'il faut envoyer, false s'il faut recevoir
MessageRank source() const override
Rang de l'envoyeur du message.
Vue d'un tableau d'éléments de type T.
Sentinelle pour collecter les informations temporelles.
Vecteur 1D de données avec sémantique par valeur (style STL).
ePointToPointMessageType
Type de message point à point.
@ WaitSome
Attend que tous les messages de la liste soient traités.
Int32 Integer
Type représentant un entier.
std::int64_t Int64
Type entier signé sur 64 bits.
std::int32_t Int32
Type entier signé sur 32 bits.