Arcane  v3.15.3.0
Documentation utilisateur
Chargement...
Recherche...
Aucune correspondance
MpiSerializeMessageList.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* MpiSerializeMessageList.cc (C) 2000-2025 */
9/* */
10/* Gestion des messages de sérialisation via MPI. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arccore/message_passing_mpi/MpiSerializeMessageList.h"
15#include "arccore/message_passing_mpi/MpiSerializeDispatcher.h"
16#include "arccore/message_passing_mpi/MpiAdapter.h"
17#include "arccore/message_passing/BasicSerializeMessage.h"
18#include "arccore/trace/ITraceMng.h"
19#include "arccore/base/FatalErrorException.h"
20#include "arccore/base/TimeoutException.h"
21#include "arccore/base/NotSupportedException.h"
22
23#include <algorithm>
24
25/*---------------------------------------------------------------------------*/
26/*---------------------------------------------------------------------------*/
27
28namespace Arcane::MessagePassing::Mpi
29{
30
31/*---------------------------------------------------------------------------*/
32/*---------------------------------------------------------------------------*/
33
35{
36 public:
37
38 bool operator()(const internal::BasicSerializeMessage* m1,
40 {
41 return _SortMessages::compare(m1,m2);
42 }
43
44 bool operator()(const ISerializeMessage* pm1,const ISerializeMessage* pm2)
45 {
46 return compare(pm1,pm2);
47 }
48 // Note: avec la version 16.5.3 (avril 2020) de VisualStudio, ce
49 // comparateur génère une exception en mode débug. Cela signifie qu'il
50 // n'est pas cohérent.
51 // TODO: corriger le problème
52 static bool compare(const ISerializeMessage* pm1,const ISerializeMessage* pm2)
53 {
54 MessageRank dest_p1 = pm1->destination();
55 MessageRank dest_p2 = pm2->destination();
56 MessageTag p1_tag = pm1->internalTag();
57 MessageTag p2_tag = pm2->internalTag();
58
59 // TODO: traiter le cas destRank()==A_NULL_RANK
60 if (dest_p1==dest_p2){
61 MessageRank orig_p1 = pm1->source();
62 MessageRank orig_p2 = pm2->source();
63
64 if (pm1->isSend()){
65 if (orig_p1==orig_p2 && (p1_tag!=p2_tag))
66 return p1_tag < p2_tag;
67 if (orig_p1<dest_p1)
68 return true;
69 }
70 if (!pm1->isSend()){
71 if (orig_p1==orig_p2 && (p1_tag!=p2_tag))
72 return p1_tag < p2_tag;
73 if (dest_p1<orig_p1)
74 return true;
75 }
76 return false;
77 }
78 if (dest_p1 < dest_p2)
79 return true;
80 return false;
81 }
82};
83
84/*---------------------------------------------------------------------------*/
85/*---------------------------------------------------------------------------*/
86
87/*---------------------------------------------------------------------------*/
88/*---------------------------------------------------------------------------*/
89
90MpiSerializeMessageList::
91MpiSerializeMessageList(MpiSerializeDispatcher* dispatcher)
92: m_dispatcher(dispatcher)
93, m_adapter(dispatcher->adapter())
94, m_trace(m_adapter->traceMng())
95, m_message_passing_phase(timeMetricPhaseMessagePassing(m_adapter->timeMetricCollector()))
96{}
97
98/*---------------------------------------------------------------------------*/
99/*---------------------------------------------------------------------------*/
100
103{
104 auto* true_message = dynamic_cast<internal::BasicSerializeMessage*>(message);
105 if (!true_message)
106 ARCCORE_FATAL("Can not convert 'ISerializeMessage' to 'BasicSerializeMessage'");
107 if (true_message->isSend() && true_message->source()!=MessageRank(m_adapter->commRank()))
108 ARCCORE_FATAL("Invalid source '{0}' for send message (expected={1})",
109 true_message->source(),m_adapter->commRank());
110 m_messages_to_process.add(true_message);
111}
112
113/*---------------------------------------------------------------------------*/
114/*---------------------------------------------------------------------------*/
115
118{
119 Integer nb_message = m_messages_to_process.size();
120 if (nb_message==0)
121 return;
122
123 // L'envoie de messages peut se faire en mode bloquant ou non bloquant.
124 // Quel que soit le mode, l'ordre d'envoie doit permettre de ne pas
125 // avoir de deadlock. Pour cela, on applique l'algorithme suivant:
126 // - chaque processeur effectue ces envois et réceptions dans l'ordre
127 // croissant des rang de processeurs
128 // - lorsque deux processeurs communiquent, c'est celui dont le rang est
129 // le plus faible qui envoie ces messages d'abord.
130 ITraceMng* msg = m_trace;
131 // NOTE (avril 2020): n'appelle plus le tri car il semble que l'opérateur
132 // de comparaison ne soit pas cohérent. De plus, il n'est normalement
133 // plus nécessaire de faire ce tri car tout est non bloquant.
134 //std::stable_sort(std::begin(m_messages_to_process),std::end(m_messages_to_process),_SortMessages());
135 const bool print_sorted = false;
136 if (print_sorted){
137 for( Integer i=0, is=m_messages_to_process.size(); i<is; ++i ){
138 ISerializeMessage* pmsg = m_messages_to_process[i];
139 msg->debug() << "Sorted message " << i
140 << " orig=" << pmsg->source()
141 << " dest=" << pmsg->destination()
142 << " tag=" << pmsg->internalTag()
143 << " send?=" << pmsg->isSend();
144 }
145 }
146
147 Int64 serialize_buffer_size = m_dispatcher->serializeBufferSize();
148 for( Integer i=0; i<nb_message; ++i ){
149 internal::BasicSerializeMessage* mpi_msg = m_messages_to_process[i];
152 MessageRank dest = pmsg->destination();
153 MessageTag tag = pmsg->internalTag();
154 bool is_one_message_strategy = (pmsg->strategy()==ISerializeMessage::eStrategy::OneMessage);
155 if (pmsg->isSend()){
156 // TODO: il faut utiliser m_dispatcher->sendSerializer() à la place
157 // de legacySendSerializer() mais avant de fair cela il faut envoyer
158 // les deux messages potentiels en même temps pour des raisons de
159 // performance (voir MpiSerializeDispatcher::sendSerializer())
160 const bool do_old = false;
161 if (do_old){
163 ARCCORE_THROW(NotSupportedException,"OneMessage strategy with legacy send serializer");
164 new_request = m_dispatcher->legacySendSerializer(pmsg->serializer(),{dest,tag,NonBlocking});
165 }
166 else
167 new_request = m_dispatcher->sendSerializer(pmsg->serializer(),{dest,tag,NonBlocking},is_one_message_strategy);
168 }
169 else{
170 BasicSerializer* sbuf = mpi_msg->trueSerializer();
171 sbuf->preallocate(serialize_buffer_size);
172 MessageId message_id = pmsg->_internalMessageId();
173 if (message_id.isValid()){
174 // Message de sérialisation utilisant MPI_Message
175 // 'message_id' contient la taille du message final. On préalloue donc
176 // le buffer de réception à cette taille ce qui permet si besoin de ne faire
177 // qu'un seul message de réception.
178 // préallouer le buffer à la taille né
180 sbuf->preallocate(message_id.sourceInfo().size());
181 new_request = m_dispatcher->_recvSerializerBytes(sbuf->globalBuffer(),message_id,false);
182 }
183 else
184 new_request = m_dispatcher->_recvSerializerBytes(sbuf->globalBuffer(),dest,tag,false);
185 }
186 mpi_msg->setIsProcessed(true);
187 m_messages_request.add(MpiSerializeMessageRequest(mpi_msg,new_request));
188 }
189 // Plus de messages à exécuter
190 m_messages_to_process.clear();
191}
192
193/*---------------------------------------------------------------------------*/
194/*---------------------------------------------------------------------------*/
195
196Integer MpiSerializeMessageList::
197waitMessages(eWaitType wait_type)
198{
200 Integer n = _waitMessages(wait_type);
201 m_dispatcher->checkFinishedSubRequests();
202 return n;
203}
204
205/*---------------------------------------------------------------------------*/
206/*---------------------------------------------------------------------------*/
207
208Integer MpiSerializeMessageList::
209_waitMessages(eWaitType wait_type)
210{
211 TimeMetricSentry tphase(m_message_passing_phase);
212 if (wait_type==WaitAll){
213 while (_waitMessages2(WaitSome)!=(-1))
214 ;
215 return (-1);
216 }
217 return _waitMessages2(wait_type);
218}
219
220/*---------------------------------------------------------------------------*/
221/*---------------------------------------------------------------------------*/
222
223Integer MpiSerializeMessageList::
224_waitMessages2(eWaitType wait_type)
225{
226 Integer nb_message_finished = 0;
227 ITraceMng* msg = m_trace;
228 Integer nb_message = m_messages_request.size();
229 Int32 comm_rank = m_adapter->commRank();
230 UniqueArray<MPI_Status> mpi_status(nb_message);
231 UniqueArray<Request> requests(nb_message);
232 UniqueArray<bool> done_indexes(nb_message);
233 done_indexes.fill(false);
234 if (msg->verbosityLevel()>=6)
235 m_is_verbose = true;
236
237 for( Integer z=0; z<nb_message; ++z ){
238 requests[z] = m_messages_request[z].m_request;
239 }
240
241 if (m_is_verbose){
242 msg->info() << "Waiting for rank =" << comm_rank << " nb_message=" << nb_message;
243
244 for( Integer z=0; z<nb_message; ++z ){
245 internal::BasicSerializeMessage* msm = m_messages_request[z].m_mpi_message;
246 msg->info() << "Waiting for message: "
247 << " rank=" << comm_rank
248 << " issend=" << msm->isSend()
249 << " dest=" << msm->destination()
250 << " tag=" << msm->internalTag()
251 << " request=" << requests[z];
252 }
253 }
254
255 mpi_status.resize(nb_message);
256 MpiAdapter* adapter = m_adapter;
257 try{
258 switch(wait_type){
259 case WaitAll:
260 ARCCORE_FATAL("Bad value WaitAll");
261 case WaitSome:
262 msg->debug() << " rank=" << comm_rank << "Wait some " << nb_message;
263 if (nb_message>0)
264 adapter->waitSomeRequestsMPI(requests,done_indexes,mpi_status,false);
265 break;
267 msg->debug() << " rank=" << comm_rank << "Wait some non blocking " << nb_message;
268 if (nb_message>0)
269 adapter->waitSomeRequestsMPI(requests,done_indexes,mpi_status,true);
270 break;
271 }
272 }
273 catch(const TimeoutException&){
274 std::ostringstream ostr;
275 for( Integer z=0; z<nb_message; ++z ){
276 internal::BasicSerializeMessage* message = m_messages_request[z].m_mpi_message;
277 ostr << "IndexReturn message: "
278 << " issend=" << message->isSend()
279 << " dest=" << message->destination()
280 << " done_index=" << done_indexes[z]
281 << " status_src=" << mpi_status[z].MPI_SOURCE
282 << " status_tag=" << mpi_status[z].MPI_TAG
283 << " status_err=" << mpi_status[z].MPI_ERROR
284 << " request=" << requests[z]
285 << "\n";
286 }
287 msg->pinfo() << "Info messages: myrank=" << comm_rank << " " << ostr.str();
288 throw;
289 }
290 if (m_is_verbose){
291 for( Integer z=0; z<nb_message; ++z ){
292 internal::BasicSerializeMessage* message = m_messages_request[z].m_mpi_message;
293 bool is_send = message->isSend();
294 MessageRank destination = message->destination();
295 Int64 message_size = message->trueSerializer()->totalSize();
296 if (is_send)
297 msg->info() << "IndexReturn message: Send: "
298 << " dest=" << destination
299 << " size=" << message_size
300 << " done_index=" << done_indexes[z]
301 << " request=" << requests[z];
302 else
303 msg->info() << "IndexReturn message: Recv: "
304 << " dest=" << destination
305 << " size=" << message_size
306 << " done_index=" << done_indexes[z]
307 << " request=" << requests[z]
308 << " status_src=" << mpi_status[z].MPI_SOURCE
309 << " status_tag=" << mpi_status[z].MPI_TAG
310 << " status_err=" << mpi_status[z].MPI_ERROR;
311 }
312 }
313
314 UniqueArray<MpiSerializeMessageRequest> new_messages;
315
316 int mpi_status_index = 0;
317 for( Integer i=0; i<nb_message; ++i ){
318 internal::BasicSerializeMessage* mpi_msg = m_messages_request[i].m_mpi_message;
319 if (done_indexes[i]){
320 MPI_Status status = mpi_status[mpi_status_index];
321 Request rq = requests[i];
322 // NOTE: les valeurs MPI_SOURCE et MPI_TAG de status ne sont
323 // valides que pour les réception. On utilise ces valeurs et pas celles
324 // de ISerializeMessage car il est possible de spécifier MPI_ANY_TAG ou
325 // MPI_ANY_SOURCE dans les messages et on a besoin de connaitre les bonnes
326 // valeurs pour réceptionner le second message.
327 MessageRank source(status.MPI_SOURCE);
328 MessageTag tag(status.MPI_TAG);
329 if (m_is_verbose){
330 msg->info() << "Message number " << i << " Finished, source=" << source
331 << " tag=" << tag
332 << " err=" << status.MPI_ERROR
333 << " is_send=" << mpi_msg->isSend()
334 << " request=" << rq;
335 }
336 ++mpi_status_index;
337 Request r = _processOneMessage(mpi_msg,source,tag);
338 if (r.isValid()){
339 if (m_is_verbose)
340 msg->info() << "Add new receive operation for message number " << i
341 << " request=" << r;
342 new_messages.add(MpiSerializeMessageRequest(mpi_msg,r));
343 }
344 else{
345 mpi_msg->setFinished(true);
346 ++nb_message_finished;
347 }
348 }
349 else{
350 if (m_is_verbose)
351 msg->info() << "Message number " << i << " not finished"
352 << " request=" << requests[i];
353 new_messages.add(MpiSerializeMessageRequest(mpi_msg,requests[i]));
354 }
355 }
356 msg->flush();
357 m_messages_request = new_messages;
358 if (m_messages_request.empty())
359 return (-1);
360 return nb_message_finished;
361}
362
363/*---------------------------------------------------------------------------*/
364/*---------------------------------------------------------------------------*/
365/*!
366 * \brief Effectue la requête. Retourne une éventuelle requête si non nul.
367 */
370{
371 Request request;
372 if (m_is_verbose)
373 m_trace->info() << "Process one message msg=" << this
374 << " number=" << message->messageNumber()
375 << " is_send=" << message->isSend();
376 if (message->isSend())
377 return request;
378 return _processOneMessageGlobalBuffer(message,source,mpi_tag);
379}
380
381/*---------------------------------------------------------------------------*/
382/*---------------------------------------------------------------------------*/
383/*!
384 * \brief Effectue la requête. Retourne une éventuelle requête si non nul.
385 */
388{
389 Request request;
390 BasicSerializer* sbuf = message->trueSerializer();
391 Int64 message_size = sbuf->totalSize();
392
393 MessageRank dest_rank = message->destination();
394 if (dest_rank.isNull() && !m_adapter->isAllowNullRankForAnySource())
395 ARCCORE_FATAL("Can not use MPI_Mprobe with null rank. Use MessageRank::anySourceRank() instead");
396
397 if (dest_rank.isNull() || dest_rank.isAnySource())
398 // Signifie que le message était un MPI_ANY_SOURCE
399 dest_rank = source;
400
401 if (m_is_verbose){
402 m_trace->info() << "Process one message (GlobalBuffer) msg=" << this
403 << " number=" << message->messageNumber()
404 << " is_send=" << message->isSend()
405 << " dest_rank=" << dest_rank
406 << " size=" << message_size
407 << " (buf_size=" << m_dispatcher->serializeBufferSize() << ")";
408 }
409
410 // S'il s'agit du premier message, récupère la longueur totale.
411 // et si le message total est trop gros (>m_serialize_buffer_size)
412 // poste un nouveau message pour récupèrer les données sérialisées.
413 if (message->messageNumber()==0){
414 if (message_size<=m_dispatcher->serializeBufferSize()
415 || message->strategy()==ISerializeMessage::eStrategy::OneMessage){
416 sbuf->setFromSizes();
417 return request;
418 }
419 m_dispatcher->_checkBigMessage(message_size);
420 sbuf->preallocate(message_size);
421 Span<Byte> bytes = sbuf->globalBuffer();
422 MessageTag next_tag = MpiSerializeDispatcher::nextSerializeTag(mpi_tag);
423 request = m_dispatcher->_recvSerializerBytes(bytes,dest_rank,next_tag,false);
424 message->setMessageNumber(1);
425 return request;
426 }
427 sbuf->setFromSizes();
428 return request;
429}
430
431/*---------------------------------------------------------------------------*/
432/*---------------------------------------------------------------------------*/
433
434Ref<ISerializeMessage> MpiSerializeMessageList::
435createAndAddMessage(MessageRank destination,ePointToPointMessageType type)
436{
437 MessageRank source(m_adapter->commRank());
438 auto x = internal::BasicSerializeMessage::create(source,destination,type);
439 addMessage(x.get());
440 return x;
441}
442
443/*---------------------------------------------------------------------------*/
444/*---------------------------------------------------------------------------*/
445
446} // End namespace Arccore::MessagePassing::Mpi
447
448/*---------------------------------------------------------------------------*/
449/*---------------------------------------------------------------------------*/
int commRank() const
Rang de cette instance dans le communicateur.
Definition MpiAdapter.h:143
Request sendSerializer(const ISerializer *s, const PointToPointMessageInfo &message) override
Message d'envoi.
Request _processOneMessage(internal::BasicSerializeMessage *msm, MessageRank source, MessageTag mpi_tag)
Effectue la requête. Retourne une éventuelle requête si non nul.
void addMessage(ISerializeMessage *msg) override
Ajoute un message à la liste.
Request _processOneMessageGlobalBuffer(internal::BasicSerializeMessage *msm, MessageRank source, MessageTag mpi_tag)
Effectue la requête. Retourne une éventuelle requête si non nul.
void processPendingMessages() override
Envoie les messages de la liste qui ne l'ont pas encore été.
Interface du gestionnaire de traces.
virtual TraceMessage info()=0
Flot pour un message d'information.
Requête d'un message.
Definition Request.h:77
MessageRank destination() const override
Rang du destinataire (si isSend() est vrai) ou de l'envoyeur.
eStrategy strategy() const override
Stratégie utilisée pour les envois/réceptions.
bool isSend() const override
true s'il faut envoyer, false s'il faut recevoir
Référence à une instance.
bool isNull() const
Indique si le compteur référence une instance non nulle.
@ WaitSome
Attend que tous les messages de la liste soient traités.