14#include "arccore/message_passing_mpi/MpiSerializeMessageList.h"
15#include "arccore/message_passing_mpi/MpiSerializeDispatcher.h"
16#include "arccore/message_passing_mpi/MpiAdapter.h"
17#include "arccore/message_passing/BasicSerializeMessage.h"
18#include "arccore/trace/ITraceMng.h"
19#include "arccore/base/FatalErrorException.h"
20#include "arccore/base/TimeoutException.h"
21#include "arccore/base/NotSupportedException.h"
28namespace Arcane::MessagePassing::Mpi
41 return _SortMessages::compare(
m1,
m2);
90MpiSerializeMessageList::
92: m_dispatcher(dispatcher)
93, m_adapter(dispatcher->adapter())
94, m_trace(m_adapter->traceMng())
95, m_message_passing_phase(timeMetricPhaseMessagePassing(m_adapter->timeMetricCollector()))
106 ARCCORE_FATAL(
"Can not convert 'ISerializeMessage' to 'BasicSerializeMessage'");
108 ARCCORE_FATAL(
"Invalid source '{0}' for send message (expected={1})",
119 Integer
nb_message = m_messages_to_process.size();
137 for( Integer i=0,
is=m_messages_to_process.size(); i<
is; ++i ){
139 msg->debug() <<
"Sorted message " << i
140 <<
" orig=" <<
pmsg->source()
141 <<
" dest=" <<
pmsg->destination()
142 <<
" tag=" <<
pmsg->internalTag()
143 <<
" send?=" <<
pmsg->isSend();
160 const bool do_old =
false;
164 new_request = m_dispatcher->legacySendSerializer(
pmsg->serializer(),{dest,tag,NonBlocking});
190 m_messages_to_process.clear();
196Integer MpiSerializeMessageList::
201 m_dispatcher->checkFinishedSubRequests();
208Integer MpiSerializeMessageList::
211 TimeMetricSentry
tphase(m_message_passing_phase);
213 while (_waitMessages2(WaitSome)!=(-1))
223Integer MpiSerializeMessageList::
224_waitMessages2(eWaitType wait_type)
226 Integer nb_message_finished = 0;
227 ITraceMng* msg = m_trace;
228 Integer nb_message = m_messages_request.size();
229 Int32 comm_rank = m_adapter->
commRank();
230 UniqueArray<MPI_Status> mpi_status(nb_message);
231 UniqueArray<Request> requests(nb_message);
232 UniqueArray<bool> done_indexes(nb_message);
233 done_indexes.fill(
false);
234 if (msg->verbosityLevel()>=6)
237 for( Integer z=0; z<nb_message; ++z ){
238 requests[z] = m_messages_request[z].m_request;
242 msg->info() <<
"Waiting for rank =" << comm_rank <<
" nb_message=" << nb_message;
244 for( Integer z=0; z<nb_message; ++z ){
245 internal::BasicSerializeMessage* msm = m_messages_request[z].m_mpi_message;
246 msg->info() <<
"Waiting for message: "
247 <<
" rank=" << comm_rank
248 <<
" issend=" << msm->isSend()
249 <<
" dest=" << msm->destination()
250 <<
" tag=" << msm->internalTag()
251 <<
" request=" << requests[z];
255 mpi_status.resize(nb_message);
256 MpiAdapter* adapter = m_adapter;
260 ARCCORE_FATAL(
"Bad value WaitAll");
262 msg->debug() <<
" rank=" << comm_rank <<
"Wait some " << nb_message;
264 adapter->waitSomeRequestsMPI(requests,done_indexes,mpi_status,
false);
267 msg->debug() <<
" rank=" << comm_rank <<
"Wait some non blocking " << nb_message;
269 adapter->waitSomeRequestsMPI(requests,done_indexes,mpi_status,
true);
273 catch(
const TimeoutException&){
274 std::ostringstream ostr;
275 for( Integer z=0; z<nb_message; ++z ){
276 internal::BasicSerializeMessage* message = m_messages_request[z].m_mpi_message;
277 ostr <<
"IndexReturn message: "
278 <<
" issend=" << message->isSend()
279 <<
" dest=" << message->destination()
280 <<
" done_index=" << done_indexes[z]
281 <<
" status_src=" << mpi_status[z].MPI_SOURCE
282 <<
" status_tag=" << mpi_status[z].MPI_TAG
283 <<
" status_err=" << mpi_status[z].MPI_ERROR
284 <<
" request=" << requests[z]
287 msg->pinfo() <<
"Info messages: myrank=" << comm_rank <<
" " << ostr.str();
291 for( Integer z=0; z<nb_message; ++z ){
292 internal::BasicSerializeMessage* message = m_messages_request[z].m_mpi_message;
293 bool is_send = message->isSend();
294 MessageRank destination = message->destination();
295 Int64 message_size = message->trueSerializer()->totalSize();
297 msg->info() <<
"IndexReturn message: Send: "
298 <<
" dest=" << destination
299 <<
" size=" << message_size
300 <<
" done_index=" << done_indexes[z]
301 <<
" request=" << requests[z];
303 msg->info() <<
"IndexReturn message: Recv: "
304 <<
" dest=" << destination
305 <<
" size=" << message_size
306 <<
" done_index=" << done_indexes[z]
307 <<
" request=" << requests[z]
308 <<
" status_src=" << mpi_status[z].MPI_SOURCE
309 <<
" status_tag=" << mpi_status[z].MPI_TAG
310 <<
" status_err=" << mpi_status[z].MPI_ERROR;
314 UniqueArray<MpiSerializeMessageRequest> new_messages;
316 int mpi_status_index = 0;
317 for( Integer i=0; i<nb_message; ++i ){
318 internal::BasicSerializeMessage* mpi_msg = m_messages_request[i].m_mpi_message;
319 if (done_indexes[i]){
320 MPI_Status status = mpi_status[mpi_status_index];
321 Request rq = requests[i];
327 MessageRank source(status.MPI_SOURCE);
328 MessageTag tag(status.MPI_TAG);
330 msg->info() <<
"Message number " << i <<
" Finished, source=" << source
332 <<
" err=" << status.MPI_ERROR
333 <<
" is_send=" << mpi_msg->isSend()
334 <<
" request=" << rq;
340 msg->info() <<
"Add new receive operation for message number " << i
342 new_messages.add(MpiSerializeMessageRequest(mpi_msg,r));
345 mpi_msg->setFinished(
true);
346 ++nb_message_finished;
351 msg->info() <<
"Message number " << i <<
" not finished"
352 <<
" request=" << requests[i];
353 new_messages.add(MpiSerializeMessageRequest(mpi_msg,requests[i]));
357 m_messages_request = new_messages;
358 if (m_messages_request.empty())
360 return nb_message_finished;
373 m_trace->
info() <<
"Process one message msg=" <<
this
374 <<
" number=" << message->messageNumber()
375 <<
" is_send=" << message->
isSend();
395 ARCCORE_FATAL(
"Can not use MPI_Mprobe with null rank. Use MessageRank::anySourceRank() instead");
402 m_trace->
info() <<
"Process one message (GlobalBuffer) msg=" <<
this
403 <<
" number=" << message->messageNumber()
404 <<
" is_send=" << message->
isSend()
407 <<
" (buf_size=" << m_dispatcher->serializeBufferSize() <<
")";
413 if (message->messageNumber()==0){
415 || message->
strategy()==ISerializeMessage::eStrategy::OneMessage){
416 sbuf->setFromSizes();
424 message->setMessageNumber(1);
427 sbuf->setFromSizes();
435createAndAddMessage(
MessageRank destination,ePointToPointMessageType type)
438 auto x = internal::BasicSerializeMessage::create(source,destination,type);
int commRank() const
Rang de cette instance dans le communicateur.
Request sendSerializer(const ISerializer *s, const PointToPointMessageInfo &message) override
Message d'envoi.
Request _processOneMessage(internal::BasicSerializeMessage *msm, MessageRank source, MessageTag mpi_tag)
Effectue la requête. Retourne une éventuelle requête si non nul.
void addMessage(ISerializeMessage *msg) override
Ajoute un message à la liste.
Request _processOneMessageGlobalBuffer(internal::BasicSerializeMessage *msm, MessageRank source, MessageTag mpi_tag)
Effectue la requête. Retourne une éventuelle requête si non nul.
void processPendingMessages() override
Envoie les messages de la liste qui ne l'ont pas encore été.
Interface du gestionnaire de traces.
virtual TraceMessage info()=0
Flot pour un message d'information.
MessageRank destination() const override
Rang du destinataire (si isSend() est vrai) ou de l'envoyeur.
eStrategy strategy() const override
Stratégie utilisée pour les envois/réceptions.
bool isSend() const override
true s'il faut envoyer, false s'il faut recevoir
Référence à une instance.
bool isNull() const
Indique si le compteur référence une instance non nulle.
@ WaitSome
Attend que tous les messages de la liste soient traités.