Arcane  v3.14.10.0
Documentation utilisateur
Chargement...
Recherche...
Aucune correspondance
MpiSerializeMessageList.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2024 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* MpiSerializeMessageList.cc (C) 2000-2024 */
9/* */
10/* Gestion des messages de sérialisation via MPI. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arccore/message_passing_mpi/MpiSerializeMessageList.h"
15#include "arccore/message_passing_mpi/MpiSerializeDispatcher.h"
16#include "arccore/message_passing_mpi/MpiAdapter.h"
17#include "arccore/message_passing/BasicSerializeMessage.h"
18#include "arccore/trace/ITraceMng.h"
19#include "arccore/base/FatalErrorException.h"
20#include "arccore/base/TimeoutException.h"
21#include "arccore/base/NotSupportedException.h"
22
23#include <algorithm>
24
25/*---------------------------------------------------------------------------*/
26/*---------------------------------------------------------------------------*/
27
28namespace Arccore::MessagePassing::Mpi
29{
30
31/*---------------------------------------------------------------------------*/
32/*---------------------------------------------------------------------------*/
33
35{
36 public:
37
38 bool operator()(const BasicSerializeMessage* m1,const BasicSerializeMessage* m2)
39 {
40 return _SortMessages::compare(m1,m2);
41 }
42
43 bool operator()(const ISerializeMessage* pm1,const ISerializeMessage* pm2)
44 {
45 return compare(pm1,pm2);
46 }
47 // Note: avec la version 16.5.3 (avril 2020) de VisualStudio, ce
48 // comparateur génère une exception en mode débug. Cela signifie qu'il
49 // n'est pas cohérent.
50 // TODO: corriger le problème
51 static bool compare(const ISerializeMessage* pm1,const ISerializeMessage* pm2)
52 {
53 MessageRank dest_p1 = pm1->destination();
54 MessageRank dest_p2 = pm2->destination();
55 MessageTag p1_tag = pm1->internalTag();
56 MessageTag p2_tag = pm2->internalTag();
57
58 // TODO: traiter le cas destRank()==A_NULL_RANK
59 if (dest_p1==dest_p2){
60 MessageRank orig_p1 = pm1->source();
61 MessageRank orig_p2 = pm2->source();
62
63 if (pm1->isSend()){
64 if (orig_p1==orig_p2 && (p1_tag!=p2_tag))
65 return p1_tag < p2_tag;
66 if (orig_p1<dest_p1)
67 return true;
68 }
69 if (!pm1->isSend()){
70 if (orig_p1==orig_p2 && (p1_tag!=p2_tag))
71 return p1_tag < p2_tag;
72 if (dest_p1<orig_p1)
73 return true;
74 }
75 return false;
76 }
77 if (dest_p1 < dest_p2)
78 return true;
79 return false;
80 }
81};
82
83/*---------------------------------------------------------------------------*/
84/*---------------------------------------------------------------------------*/
85
86/*---------------------------------------------------------------------------*/
87/*---------------------------------------------------------------------------*/
88
89MpiSerializeMessageList::
90MpiSerializeMessageList(MpiSerializeDispatcher* dispatcher)
91: m_dispatcher(dispatcher)
92, m_adapter(dispatcher->adapter())
93, m_trace(m_adapter->traceMng())
94, m_message_passing_phase(timeMetricPhaseMessagePassing(m_adapter->timeMetricCollector()))
95{}
96
97/*---------------------------------------------------------------------------*/
98/*---------------------------------------------------------------------------*/
99
102{
103 BasicSerializeMessage* true_message = dynamic_cast<BasicSerializeMessage*>(message);
104 if (!true_message)
105 ARCCORE_FATAL("Can not convert 'ISerializeMessage' to 'BasicSerializeMessage'");
106 if (true_message->isSend() && true_message->source()!=MessageRank(m_adapter->commRank()))
107 ARCCORE_FATAL("Invalid source '{0}' for send message (expected={1})",
108 true_message->source(),m_adapter->commRank());
109 m_messages_to_process.add(true_message);
110}
111
112/*---------------------------------------------------------------------------*/
113/*---------------------------------------------------------------------------*/
114
117{
118 Integer nb_message = m_messages_to_process.size();
119 if (nb_message==0)
120 return;
121
122 // L'envoie de messages peut se faire en mode bloquant ou non bloquant.
123 // Quel que soit le mode, l'ordre d'envoie doit permettre de ne pas
124 // avoir de deadlock. Pour cela, on applique l'algorithme suivant:
125 // - chaque processeur effectue ces envois et réceptions dans l'ordre
126 // croissant des rang de processeurs
127 // - lorsque deux processeurs communiquent, c'est celui dont le rang est
128 // le plus faible qui envoie ces messages d'abord.
129 ITraceMng* msg = m_trace;
130 // NOTE (avril 2020): n'appelle plus le tri car il semble que l'opérateur
131 // de comparaison ne soit pas cohérent. De plus, il n'est normalement
132 // plus nécessaire de faire ce tri car tout est non bloquant.
133 //std::stable_sort(std::begin(m_messages_to_process),std::end(m_messages_to_process),_SortMessages());
134 const bool print_sorted = false;
135 if (print_sorted){
136 for( Integer i=0, is=m_messages_to_process.size(); i<is; ++i ){
137 ISerializeMessage* pmsg = m_messages_to_process[i];
138 msg->debug() << "Sorted message " << i
139 << " orig=" << pmsg->source()
140 << " dest=" << pmsg->destination()
141 << " tag=" << pmsg->internalTag()
142 << " send?=" << pmsg->isSend();
143 }
144 }
145
146 Int64 serialize_buffer_size = m_dispatcher->serializeBufferSize();
147 for( Integer i=0; i<nb_message; ++i ){
148 BasicSerializeMessage* mpi_msg = m_messages_to_process[i];
149 ISerializeMessage* pmsg = mpi_msg;
150 Request new_request;
151 MessageRank dest = pmsg->destination();
152 MessageTag tag = pmsg->internalTag();
153 bool is_one_message_strategy = (pmsg->strategy()==ISerializeMessage::eStrategy::OneMessage);
154 if (pmsg->isSend()){
155 // TODO: il faut utiliser m_dispatcher->sendSerializer() à la place
156 // de legacySendSerializer() mais avant de fair cela il faut envoyer
157 // les deux messages potentiels en même temps pour des raisons de
158 // performance (voir MpiSerializeDispatcher::sendSerializer())
159 const bool do_old = false;
160 if (do_old){
161 if (is_one_message_strategy)
162 ARCCORE_THROW(NotSupportedException,"OneMessage strategy with legacy send serializer");
163 new_request = m_dispatcher->legacySendSerializer(pmsg->serializer(),{dest,tag,NonBlocking});
164 }
165 else
166 new_request = m_dispatcher->sendSerializer(pmsg->serializer(),{dest,tag,NonBlocking},is_one_message_strategy);
167 }
168 else{
169 BasicSerializer* sbuf = mpi_msg->trueSerializer();
170 sbuf->preallocate(serialize_buffer_size);
171 MessageId message_id = pmsg->_internalMessageId();
172 if (message_id.isValid()){
173 // Message de sérialisation utilisant MPI_Message
174 // 'message_id' contient la taille du message final. On préalloue donc
175 // le buffer de réception à cette taille ce qui permet si besoin de ne faire
176 // qu'un seul message de réception.
177 // préallouer le buffer à la taille né
178 if (is_one_message_strategy)
179 sbuf->preallocate(message_id.sourceInfo().size());
180 new_request = m_dispatcher->_recvSerializerBytes(sbuf->globalBuffer(),message_id,false);
181 }
182 else
183 new_request = m_dispatcher->_recvSerializerBytes(sbuf->globalBuffer(),dest,tag,false);
184 }
185 mpi_msg->setIsProcessed(true);
186 m_messages_request.add(MpiSerializeMessageRequest(mpi_msg,new_request));
187 }
188 // Plus de messages à exécuter
189 m_messages_to_process.clear();
190}
191
192/*---------------------------------------------------------------------------*/
193/*---------------------------------------------------------------------------*/
194
196waitMessages(eWaitType wait_type)
197{
199 Integer n = _waitMessages(wait_type);
200 m_dispatcher->checkFinishedSubRequests();
201 return n;
202}
203
204/*---------------------------------------------------------------------------*/
205/*---------------------------------------------------------------------------*/
206
207Integer MpiSerializeMessageList::
208_waitMessages(eWaitType wait_type)
209{
210 TimeMetricSentry tphase(m_message_passing_phase);
211 if (wait_type==WaitAll){
212 while (_waitMessages2(WaitSome)!=(-1))
213 ;
214 return (-1);
215 }
216 return _waitMessages2(wait_type);
217}
218
219/*---------------------------------------------------------------------------*/
220/*---------------------------------------------------------------------------*/
221
222Integer MpiSerializeMessageList::
223_waitMessages2(eWaitType wait_type)
224{
225 Integer nb_message_finished = 0;
226 ITraceMng* msg = m_trace;
227 Integer nb_message = m_messages_request.size();
228 Int32 comm_rank = m_adapter->commRank();
229 UniqueArray<MPI_Status> mpi_status(nb_message);
230 UniqueArray<Request> requests(nb_message);
231 UniqueArray<bool> done_indexes(nb_message);
232 done_indexes.fill(false);
233 if (msg->verbosityLevel()>=6)
234 m_is_verbose = true;
235
236 for( Integer z=0; z<nb_message; ++z ){
237 requests[z] = m_messages_request[z].m_request;
238 }
239
240 if (m_is_verbose){
241 msg->info() << "Waiting for rank =" << comm_rank << " nb_message=" << nb_message;
242
243 for( Integer z=0; z<nb_message; ++z ){
244 BasicSerializeMessage* msm = m_messages_request[z].m_mpi_message;
245 msg->info() << "Waiting for message: "
246 << " rank=" << comm_rank
247 << " issend=" << msm->isSend()
248 << " dest=" << msm->destination()
249 << " tag=" << msm->internalTag()
250 << " request=" << requests[z];
251 }
252 }
253
254 mpi_status.resize(nb_message);
255 MpiAdapter* adapter = m_adapter;
256 try{
257 switch(wait_type){
258 case WaitAll:
259 ARCCORE_FATAL("Bad value WaitAll");
260 case WaitSome:
261 msg->debug() << " rank=" << comm_rank << "Wait some " << nb_message;
262 if (nb_message>0)
263 adapter->waitSomeRequestsMPI(requests,done_indexes,mpi_status,false);
264 break;
266 msg->debug() << " rank=" << comm_rank << "Wait some non blocking " << nb_message;
267 if (nb_message>0)
268 adapter->waitSomeRequestsMPI(requests,done_indexes,mpi_status,true);
269 break;
270 }
271 }
272 catch(const TimeoutException&){
273 std::ostringstream ostr;
274 for( Integer z=0; z<nb_message; ++z ){
275 BasicSerializeMessage* message = m_messages_request[z].m_mpi_message;
276 ostr << "IndexReturn message: "
277 << " issend=" << message->isSend()
278 << " dest=" << message->destination()
279 << " done_index=" << done_indexes[z]
280 << " status_src=" << mpi_status[z].MPI_SOURCE
281 << " status_tag=" << mpi_status[z].MPI_TAG
282 << " status_err=" << mpi_status[z].MPI_ERROR
283 << " request=" << requests[z]
284 << "\n";
285 }
286 msg->pinfo() << "Info messages: myrank=" << comm_rank << " " << ostr.str();
287 throw;
288 }
289 if (m_is_verbose){
290 for( Integer z=0; z<nb_message; ++z ){
291 BasicSerializeMessage* message = m_messages_request[z].m_mpi_message;
292 bool is_send = message->isSend();
293 MessageRank destination = message->destination();
294 Int64 message_size = message->trueSerializer()->totalSize();
295 if (is_send)
296 msg->info() << "IndexReturn message: Send: "
297 << " dest=" << destination
298 << " size=" << message_size
299 << " done_index=" << done_indexes[z]
300 << " request=" << requests[z];
301 else
302 msg->info() << "IndexReturn message: Recv: "
303 << " dest=" << destination
304 << " size=" << message_size
305 << " done_index=" << done_indexes[z]
306 << " request=" << requests[z]
307 << " status_src=" << mpi_status[z].MPI_SOURCE
308 << " status_tag=" << mpi_status[z].MPI_TAG
309 << " status_err=" << mpi_status[z].MPI_ERROR;
310 }
311 }
312
313 UniqueArray<MpiSerializeMessageRequest> new_messages;
314
315 int mpi_status_index = 0;
316 for( Integer i=0; i<nb_message; ++i ){
317 BasicSerializeMessage* mpi_msg = m_messages_request[i].m_mpi_message;
318 if (done_indexes[i]){
319 MPI_Status status = mpi_status[mpi_status_index];
320 Request rq = requests[i];
321 // NOTE: les valeurs MPI_SOURCE et MPI_TAG de status ne sont
322 // valides que pour les réception. On utilise ces valeurs et pas celles
323 // de ISerializeMessage car il est possible de spécifier MPI_ANY_TAG ou
324 // MPI_ANY_SOURCE dans les messages et on a besoin de connaitre les bonnes
325 // valeurs pour réceptionner le second message.
326 MessageRank source(status.MPI_SOURCE);
327 MessageTag tag(status.MPI_TAG);
328 if (m_is_verbose){
329 msg->info() << "Message number " << i << " Finished, source=" << source
330 << " tag=" << tag
331 << " err=" << status.MPI_ERROR
332 << " is_send=" << mpi_msg->isSend()
333 << " request=" << rq;
334 }
335 ++mpi_status_index;
336 Request r = _processOneMessage(mpi_msg,source,tag);
337 if (r.isValid()){
338 if (m_is_verbose)
339 msg->info() << "Add new receive operation for message number " << i
340 << " request=" << r;
341 new_messages.add(MpiSerializeMessageRequest(mpi_msg,r));
342 }
343 else{
344 mpi_msg->setFinished(true);
345 ++nb_message_finished;
346 }
347 }
348 else{
349 if (m_is_verbose)
350 msg->info() << "Message number " << i << " not finished"
351 << " request=" << requests[i];
352 new_messages.add(MpiSerializeMessageRequest(mpi_msg,requests[i]));
353 }
354 }
355 msg->flush();
356 m_messages_request = new_messages;
357 if (m_messages_request.empty())
358 return (-1);
359 return nb_message_finished;
360}
361
362/*---------------------------------------------------------------------------*/
363/*---------------------------------------------------------------------------*/
364/*!
365 * \brief Effectue la requête. Retourne une éventuelle requête si non nul.
366 */
369{
370 Request request;
371 if (m_is_verbose)
372 m_trace->info() << "Process one message msg=" << this
373 << " number=" << message->messageNumber()
374 << " is_send=" << message->isSend();
375 if (message->isSend())
376 return request;
377 return _processOneMessageGlobalBuffer(message,source,mpi_tag);
378}
379
380/*---------------------------------------------------------------------------*/
381/*---------------------------------------------------------------------------*/
382/*!
383 * \brief Effectue la requête. Retourne une éventuelle requête si non nul.
384 */
387{
388 Request request;
389 BasicSerializer* sbuf = message->trueSerializer();
390 Int64 message_size = sbuf->totalSize();
391
392 MessageRank dest_rank = message->destination();
393 if (dest_rank.isNull() && !m_adapter->isAllowNullRankForAnySource())
394 ARCCORE_FATAL("Can not use MPI_Mprobe with null rank. Use MessageRank::anySourceRank() instead");
395
396 if (dest_rank.isNull() || dest_rank.isAnySource())
397 // Signifie que le message était un MPI_ANY_SOURCE
398 dest_rank = source;
399
400 if (m_is_verbose){
401 m_trace->info() << "Process one message (GlobalBuffer) msg=" << this
402 << " number=" << message->messageNumber()
403 << " is_send=" << message->isSend()
404 << " dest_rank=" << dest_rank
405 << " size=" << message_size
406 << " (buf_size=" << m_dispatcher->serializeBufferSize() << ")";
407 }
408
409 // S'il s'agit du premier message, récupère la longueur totale.
410 // et si le message total est trop gros (>m_serialize_buffer_size)
411 // poste un nouveau message pour récupèrer les données sérialisées.
412 if (message->messageNumber()==0){
413 if (message_size<=m_dispatcher->serializeBufferSize()
415 sbuf->setFromSizes();
416 return request;
417 }
418 m_dispatcher->_checkBigMessage(message_size);
419 sbuf->preallocate(message_size);
420 Span<Byte> bytes = sbuf->globalBuffer();
421 MessageTag next_tag = MpiSerializeDispatcher::nextSerializeTag(mpi_tag);
422 request = m_dispatcher->_recvSerializerBytes(bytes,dest_rank,next_tag,false);
423 message->setMessageNumber(1);
424 return request;
425 }
426 sbuf->setFromSizes();
427 return request;
428}
429
430/*---------------------------------------------------------------------------*/
431/*---------------------------------------------------------------------------*/
432
433Ref<ISerializeMessage> MpiSerializeMessageList::
435{
436 MessageRank source(m_adapter->commRank());
437 auto x = BasicSerializeMessage::create(source,destination,type);
438 addMessage(x.get());
439 return x;
440}
441
442/*---------------------------------------------------------------------------*/
443/*---------------------------------------------------------------------------*/
444
445} // End namespace Arccore::MessagePassing::Mpi
446
447/*---------------------------------------------------------------------------*/
448/*---------------------------------------------------------------------------*/
Integer size() const
Nombre d'éléments du vecteur.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
void clear()
Supprime les éléments du tableau.
Interface du gestionnaire de traces.
virtual TraceMessageDbg debug(Trace::eDebugLevel=Trace::Medium)=0
Flot pour un message de debug.
virtual TraceMessage pinfo()=0
Flot pour un message d'information parallèle.
virtual Int32 verbosityLevel() const =0
Niveau de verbosité des messages.
virtual TraceMessage info()=0
Flot pour un message d'information.
virtual void flush()=0
Flush tous les flots.
virtual ISerializer * serializer()=0
Sérialiseur.
virtual eStrategy strategy() const =0
Stratégie utilisée pour les envois/réceptions.
virtual MessageRank destination() const =0
Rang du destinataire (si isSend() est vrai) ou de l'envoyeur.
virtual bool isSend() const =0
true s'il faut envoyer, false s'il faut recevoir
virtual MessageRank source() const =0
Rang de l'envoyeur du message.
@ OneMessage
Stratégie utilisant un seul message si possible.
MessageSourceInfo sourceInfo() const
Informations sur la source du message;.
Definition MessageId.h:153
bool isAnySource() const
Vrai si rang correspondant à anySourceRank()
Definition MessageRank.h:81
bool isNull() const
Vrai si rang non initialisé correspondant au rang par défaut.
Definition MessageRank.h:78
Int64 size() const
Taille du message.
int commRank() const
Rang de cette instance dans le communicateur.
Definition MpiAdapter.h:143
Request sendSerializer(const ISerializer *s, const PointToPointMessageInfo &message) override
Message d'envoi.
void processPendingMessages() override
Envoie les messages de la liste qui ne l'ont pas encore été.
Request _processOneMessage(BasicSerializeMessage *msm, MessageRank source, MessageTag mpi_tag)
Effectue la requête. Retourne une éventuelle requête si non nul.
void addMessage(ISerializeMessage *msg) override
Ajoute un message à la liste.
Integer waitMessages(eWaitType wait_type) override
Attend que les messages aient terminé leur exécution.
Request _processOneMessageGlobalBuffer(BasicSerializeMessage *msm, MessageRank source, MessageTag mpi_tag)
Effectue la requête. Retourne une éventuelle requête si non nul.
Ref< ISerializeMessage > createAndAddMessage(MessageRank destination, ePointToPointMessageType type) override
Créé et ajoute un message de sérialisation.
Requête d'un message.
Definition Request.h:77
MessageRank destination() const override
Rang du destinataire (si isSend() est vrai) ou de l'envoyeur.
eStrategy strategy() const override
Stratégie utilisée pour les envois/réceptions.
bool isSend() const override
true s'il faut envoyer, false s'il faut recevoir
MessageRank source() const override
Rang de l'envoyeur du message.
Vue d'un tableau d'éléments de type T.
Definition Span.h:510
Sentinelle pour collecter les informations temporelles.
Definition TimeMetric.h:101
Vecteur 1D de données avec sémantique par valeur (style STL).
ePointToPointMessageType
Type de message point à point.
@ WaitSome
Attend que tous les messages de la liste soient traités.
Int32 Integer
Type représentant un entier.
std::int64_t Int64
Type entier signé sur 64 bits.
std::int32_t Int32
Type entier signé sur 32 bits.