14#include "arccore/message_passing_mpi/MpiSerializeMessageList.h"
15#include "arccore/message_passing_mpi/MpiSerializeDispatcher.h"
16#include "arccore/message_passing_mpi/MpiAdapter.h"
17#include "arccore/message_passing/BasicSerializeMessage.h"
18#include "arccore/trace/ITraceMng.h"
19#include "arccore/base/FatalErrorException.h"
20#include "arccore/base/TimeoutException.h"
21#include "arccore/base/NotSupportedException.h"
28namespace Arccore::MessagePassing::Mpi
40 return _SortMessages::compare(
m1,
m2);
89MpiSerializeMessageList::
91: m_dispatcher(dispatcher)
92, m_adapter(dispatcher->adapter())
93, m_trace(m_adapter->traceMng())
94, m_message_passing_phase(timeMetricPhaseMessagePassing(m_adapter->timeMetricCollector()))
105 ARCCORE_FATAL(
"Can not convert 'ISerializeMessage' to 'BasicSerializeMessage'");
107 ARCCORE_FATAL(
"Invalid source '{0}' for send message (expected={1})",
136 for(
Integer i=0, is=m_messages_to_process.
size(); i<is; ++i ){
138 msg->
debug() <<
"Sorted message " << i
139 <<
" orig=" <<
pmsg->source()
140 <<
" dest=" <<
pmsg->destination()
141 <<
" tag=" <<
pmsg->internalTag()
142 <<
" send?=" <<
pmsg->isSend();
159 const bool do_old =
false;
163 new_request = m_dispatcher->legacySendSerializer(
pmsg->serializer(),{dest,tag,NonBlocking});
183 new_request = m_dispatcher->_recvSerializerBytes(
sbuf->globalBuffer(),dest,tag,
false);
189 m_messages_to_process.
clear();
200 m_dispatcher->checkFinishedSubRequests();
207Integer MpiSerializeMessageList::
212 while (_waitMessages2(
WaitSome)!=(-1))
216 return _waitMessages2(wait_type);
222Integer MpiSerializeMessageList::
225 Integer nb_message_finished = 0;
227 Integer nb_message = m_messages_request.size();
232 done_indexes.fill(
false);
236 for(
Integer z=0; z<nb_message; ++z ){
237 requests[z] = m_messages_request[z].m_request;
241 msg->
info() <<
"Waiting for rank =" << comm_rank <<
" nb_message=" << nb_message;
243 for( Integer z=0; z<nb_message; ++z ){
244 BasicSerializeMessage* msm = m_messages_request[z].m_mpi_message;
245 msg->
info() <<
"Waiting for message: "
246 <<
" rank=" << comm_rank
247 <<
" issend=" << msm->isSend()
248 <<
" dest=" << msm->destination()
249 <<
" tag=" << msm->internalTag()
250 <<
" request=" << requests[z];
254 mpi_status.resize(nb_message);
255 MpiAdapter* adapter = m_adapter;
259 ARCCORE_FATAL(
"Bad value WaitAll");
261 msg->
debug() <<
" rank=" << comm_rank <<
"Wait some " << nb_message;
263 adapter->waitSomeRequestsMPI(requests,done_indexes,mpi_status,
false);
266 msg->
debug() <<
" rank=" << comm_rank <<
"Wait some non blocking " << nb_message;
268 adapter->waitSomeRequestsMPI(requests,done_indexes,mpi_status,
true);
272 catch(
const TimeoutException&){
273 std::ostringstream ostr;
274 for( Integer z=0; z<nb_message; ++z ){
275 BasicSerializeMessage* message = m_messages_request[z].m_mpi_message;
276 ostr <<
"IndexReturn message: "
277 <<
" issend=" << message->isSend()
278 <<
" dest=" << message->destination()
279 <<
" done_index=" << done_indexes[z]
280 <<
" status_src=" << mpi_status[z].MPI_SOURCE
281 <<
" status_tag=" << mpi_status[z].MPI_TAG
282 <<
" status_err=" << mpi_status[z].MPI_ERROR
283 <<
" request=" << requests[z]
286 msg->
pinfo() <<
"Info messages: myrank=" << comm_rank <<
" " << ostr.str();
290 for( Integer z=0; z<nb_message; ++z ){
291 BasicSerializeMessage* message = m_messages_request[z].m_mpi_message;
292 bool is_send = message->isSend();
293 MessageRank destination = message->destination();
294 Int64 message_size = message->trueSerializer()->totalSize();
296 msg->
info() <<
"IndexReturn message: Send: "
297 <<
" dest=" << destination
298 <<
" size=" << message_size
299 <<
" done_index=" << done_indexes[z]
300 <<
" request=" << requests[z];
302 msg->
info() <<
"IndexReturn message: Recv: "
303 <<
" dest=" << destination
304 <<
" size=" << message_size
305 <<
" done_index=" << done_indexes[z]
306 <<
" request=" << requests[z]
307 <<
" status_src=" << mpi_status[z].MPI_SOURCE
308 <<
" status_tag=" << mpi_status[z].MPI_TAG
309 <<
" status_err=" << mpi_status[z].MPI_ERROR;
313 UniqueArray<MpiSerializeMessageRequest> new_messages;
315 int mpi_status_index = 0;
316 for( Integer i=0; i<nb_message; ++i ){
317 BasicSerializeMessage* mpi_msg = m_messages_request[i].m_mpi_message;
318 if (done_indexes[i]){
319 MPI_Status status = mpi_status[mpi_status_index];
320 Request rq = requests[i];
326 MessageRank source(status.MPI_SOURCE);
327 MessageTag tag(status.MPI_TAG);
329 msg->
info() <<
"Message number " << i <<
" Finished, source=" << source
331 <<
" err=" << status.MPI_ERROR
332 <<
" is_send=" << mpi_msg->isSend()
333 <<
" request=" << rq;
339 msg->
info() <<
"Add new receive operation for message number " << i
341 new_messages.add(MpiSerializeMessageRequest(mpi_msg,r));
344 mpi_msg->setFinished(
true);
345 ++nb_message_finished;
350 msg->
info() <<
"Message number " << i <<
" not finished"
351 <<
" request=" << requests[i];
352 new_messages.add(MpiSerializeMessageRequest(mpi_msg,requests[i]));
356 m_messages_request = new_messages;
357 if (m_messages_request.empty())
359 return nb_message_finished;
372 m_trace->
info() <<
"Process one message msg=" <<
this
373 <<
" number=" << message->messageNumber()
374 <<
" is_send=" << message->
isSend();
393 if (
dest_rank.isNull() && !m_adapter->isAllowNullRankForAnySource())
394 ARCCORE_FATAL(
"Can not use MPI_Mprobe with null rank. Use MessageRank::anySourceRank() instead");
401 m_trace->
info() <<
"Process one message (GlobalBuffer) msg=" <<
this
402 <<
" number=" << message->messageNumber()
403 <<
" is_send=" << message->
isSend()
406 <<
" (buf_size=" << m_dispatcher->serializeBufferSize() <<
")";
412 if (message->messageNumber()==0){
415 sbuf->setFromSizes();
423 message->setMessageNumber(1);
426 sbuf->setFromSizes();
437 auto x = BasicSerializeMessage::create(source,destination,type);
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Integer size() const
Nombre d'éléments du vecteur.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
void clear()
Supprime les éléments du tableau.
Implémentation basique de 'ISerializer'.
Interface du gestionnaire de traces.
virtual TraceMessageDbg debug(Trace::eDebugLevel=Trace::Medium)=0
Flot pour un message de debug.
virtual TraceMessage pinfo()=0
Flot pour un message d'information parallèle.
virtual Int32 verbosityLevel() const =0
Niveau de verbosité des messages.
virtual TraceMessage info()=0
Flot pour un message d'information.
virtual void flush()=0
Flush tous les flots.
Interface d'un message de sérialisation entre IMessagePassingMng.
@ OneMessage
Stratégie utilisant un seul message si possible.
int commRank() const
Rang de cette instance dans le communicateur.
Request sendSerializer(const ISerializer *s, const PointToPointMessageInfo &message) override
Message d'envoi.
void processPendingMessages() override
Envoie les messages de la liste qui ne l'ont pas encore été.
Request _processOneMessage(BasicSerializeMessage *msm, MessageRank source, MessageTag mpi_tag)
Effectue la requête. Retourne une éventuelle requête si non nul.
void addMessage(ISerializeMessage *msg) override
Ajoute un message à la liste.
Integer waitMessages(eWaitType wait_type) override
Attend que les messages aient terminé leur exécution.
Request _processOneMessageGlobalBuffer(BasicSerializeMessage *msm, MessageRank source, MessageTag mpi_tag)
Effectue la requête. Retourne une éventuelle requête si non nul.
Ref< ISerializeMessage > createAndAddMessage(MessageRank destination, ePointToPointMessageType type) override
Créé et ajoute un message de sérialisation.
Message de sérialisation utilisant un BasicSerializer.
MessageRank destination() const override
Rang du destinataire (si isSend() est vrai) ou de l'envoyeur.
eStrategy strategy() const override
Stratégie utilisée pour les envois/réceptions.
bool isSend() const override
true s'il faut envoyer, false s'il faut recevoir
Exception lorsqu'une opération n'est pas supportée.
Sentinelle pour collecter les informations temporelles.
TraceMessageDbg debug(Trace::eDebugLevel=Trace::Medium) const
Flot pour un message de debug.
Vecteur 1D de données avec sémantique par valeur (style STL).
ePointToPointMessageType
Type de message point à point.
@ WaitSome
Attend que tous les messages de la liste soient traités.
Int32 Integer
Type représentant un entier.
std::int32_t Int32
Type entier signé sur 32 bits.