Arcane  v3.16.0.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
ParallelExchanger.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* ParallelExchanger.cc (C) 2000-2025 */
9/* */
10/* Echange d'informations entre processeurs. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/impl/ParallelExchanger.h"
15
16#include "arcane/utils/NotSupportedException.h"
17#include "arcane/utils/FatalErrorException.h"
18#include "arcane/utils/PlatformUtils.h"
19
21#include "arcane/core/IParallelMng.h"
22#include "arcane/core/SerializeBuffer.h"
23#include "arcane/core/Timer.h"
24#include "arcane/core/ISerializeMessageList.h"
25#include "arcane/core/internal/SerializeMessage.h"
26
27#include <algorithm>
28
29/*---------------------------------------------------------------------------*/
30/*---------------------------------------------------------------------------*/
31
32namespace Arcane
33{
34
35/*---------------------------------------------------------------------------*/
36/*---------------------------------------------------------------------------*/
37
38ParallelExchanger::
39ParallelExchanger(IParallelMng* pm)
41{
42}
43
44/*---------------------------------------------------------------------------*/
45/*---------------------------------------------------------------------------*/
46
47ParallelExchanger::
48ParallelExchanger(Ref<IParallelMng> pm)
49: TraceAccessor(pm->traceMng())
50, m_parallel_mng(pm)
51, m_timer(pm->timerMng(),"ParallelExchangerTimer",Timer::TimerReal)
52{
53 String use_collective_str = platform::getEnvironmentVariable("ARCANE_PARALLEL_EXCHANGER_USE_COLLECTIVE");
54 if (use_collective_str=="1" || use_collective_str=="TRUE")
55 m_exchange_mode = EM_Collective;
56}
57
58/*---------------------------------------------------------------------------*/
59/*---------------------------------------------------------------------------*/
60
61ParallelExchanger::
62~ParallelExchanger()
63{
64 for( auto* buf : m_comms_buf )
65 delete buf;
66 m_comms_buf.clear();
67 delete m_own_send_message;
68 delete m_own_recv_message;
69}
70
71/*---------------------------------------------------------------------------*/
72/*---------------------------------------------------------------------------*/
73
74IParallelMng* ParallelExchanger::
75parallelMng() const
76{
77 return m_parallel_mng.get();
78}
79
80/*---------------------------------------------------------------------------*/
81/*---------------------------------------------------------------------------*/
82
83bool ParallelExchanger::
84initializeCommunicationsMessages()
85{
86 Int32 nb_send_rank = m_send_ranks.size();
87 UniqueArray<Int32> gather_input_send_ranks(nb_send_rank+1);
88 gather_input_send_ranks[0] = nb_send_rank;
89 std::copy(std::begin(m_send_ranks),std::end(m_send_ranks),
90 std::begin(gather_input_send_ranks)+1);
91
92 IntegerUniqueArray gather_output_send_ranks;
93 Integer nb_rank = m_parallel_mng->commSize();
94 m_parallel_mng->allGatherVariable(gather_input_send_ranks,
95 gather_output_send_ranks);
96
97 m_recv_ranks.clear();
98 Integer total_comm_rank = 0;
99 Int32 my_rank = m_parallel_mng->commRank();
100 {
101 Integer gather_index = 0;
102 for( Integer i=0; i<nb_rank; ++i ){
103 Integer nb_comm = gather_output_send_ranks[gather_index];
104 total_comm_rank += nb_comm;
105 ++gather_index;
106 for( Integer z=0; z<nb_comm; ++z ){
107 Integer current_rank = gather_output_send_ranks[gather_index+z];
108 if (current_rank==my_rank)
109 m_recv_ranks.add(i);
110 }
111 gather_index += nb_comm;
112 }
113 }
114
115 if (total_comm_rank==0)
116 return true;
117
118 _initializeCommunicationsMessages();
119
120 return false;
121}
122
123/*---------------------------------------------------------------------------*/
124/*---------------------------------------------------------------------------*/
125
126void ParallelExchanger::
127initializeCommunicationsMessages(Int32ConstArrayView recv_ranks)
128{
129 m_recv_ranks.resize(recv_ranks.size());
130 m_recv_ranks.copy(recv_ranks);
131 _initializeCommunicationsMessages();
132}
133
134/*---------------------------------------------------------------------------*/
135/*---------------------------------------------------------------------------*/
136
137void ParallelExchanger::
138_initializeCommunicationsMessages()
139{
140 if (m_verbosity_level>=1){
141 info() << "ParallelExchanger " << m_name << " : nb_send=" << m_send_ranks.size()
142 << " nb_recv=" << m_recv_ranks.size();
143 if (m_verbosity_level>=2){
144 info() << "ParallelExchanger " << m_name << " : send=" << m_send_ranks;
145 info() << "ParallelExchanger " << m_name << " : recv=" << m_recv_ranks;
146 }
147 }
148
149 Int32 my_rank = m_parallel_mng->commRank();
150
151 for( Int32 msg_rank : m_send_ranks ){
152 auto* comm = new SerializeMessage(my_rank,msg_rank,ISerializeMessage::MT_Send);
153 // Il ne sert à rien de s'envoyer des messages.
154 // (En plus ca fait planter certaines versions de MPI...)
155 if (my_rank==msg_rank)
156 m_own_send_message = comm;
157 else
158 m_comms_buf.add(comm);
159 m_send_serialize_infos.add(comm);
160 }
161}
162
163/*---------------------------------------------------------------------------*/
164/*---------------------------------------------------------------------------*/
165
166void ParallelExchanger::
167processExchange()
168{
171 processExchange(options);
172}
173
174/*---------------------------------------------------------------------------*/
175/*---------------------------------------------------------------------------*/
176
177void ParallelExchanger::
178processExchange(const ParallelExchangerOptions& options)
179{
180 if (m_verbosity_level>=1)
181 info() << "ParallelExchanger " << m_name << ": ProcessExchange (begin)"
182 << " date=" << platform::getCurrentDateTime();
183
184 {
185 Timer::Sentry sentry(&m_timer);
186 _processExchange(options);
187 }
188
189 if (m_verbosity_level>=1)
190 info() << "ParallelExchanger " << m_name << ": ProcessExchange (end)"
191 << " total_time=" << m_timer.lastActivationTime();
192}
193
194/*---------------------------------------------------------------------------*/
195/*---------------------------------------------------------------------------*/
196
197void ParallelExchanger::
198_processExchange(const ParallelExchangerOptions& options)
199{
200 if (m_verbosity_level>=1){
201 Int64 total_size = 0;
202 for( SerializeMessage* comm : m_send_serialize_infos ){
203 Int64 message_size = comm->trueSerializer()->totalSize();
204 total_size += message_size;
205 if (m_verbosity_level>=2)
206 info() << "Send rank=" << comm->destination() << " size=" << message_size;
207 }
208 info() << "ParallelExchanger " << m_name << ": ProcessExchange"
209 << " total_size=" << total_size << " nb_message=" << m_comms_buf.size();
210 }
211
212 bool use_all_to_all = false;
213 if (options.exchangeMode())
214 use_all_to_all = true;
215 // TODO: traiter le cas EM_Auto
216
217 // Génère les infos pour chaque processeur de qui on va recevoir
218 // des entités
219 Int32 my_rank = m_parallel_mng->commRank();
220 for( Int32 msg_rank : m_recv_ranks ){
221 auto* comm = new SerializeMessage(my_rank,msg_rank,ISerializeMessage::MT_Recv);
222 // Il ne sert à rien de s'envoyer des messages.
223 // (En plus ca fait planter certaines versions de MPI...)
224 if (my_rank==msg_rank)
225 m_own_recv_message = comm;
226 else
227 m_comms_buf.add(comm);
228 m_recv_serialize_infos.add(comm);
229 }
230
231 if (use_all_to_all)
232 _processExchangeCollective();
233 else{
234 Int32 max_pending = options.maxPendingMessage();
235 if (max_pending>0)
236 _processExchangeWithControl(max_pending);
237 else
238 m_parallel_mng->processMessages(m_comms_buf);
239
240 if (m_own_send_message && m_own_recv_message){
241 m_own_recv_message->serializer()->copy(m_own_send_message->serializer());
242 }
243 }
244
245 // Récupère les infos de chaque receveur
246 for( SerializeMessage* comm : m_recv_serialize_infos )
247 comm->serializer()->setMode(ISerializer::ModeGet);
248}
249
250/*---------------------------------------------------------------------------*/
251/*---------------------------------------------------------------------------*/
252
253void ParallelExchanger::
254_processExchangeCollective()
255{
256 info() << "Using collective exchange in ParallelExchanger";
257
258 IParallelMng* pm = m_parallel_mng.get();
259 Int32 nb_rank = pm->commSize();
260
261 Int32UniqueArray send_counts(nb_rank,0);
262 Int32UniqueArray send_indexes(nb_rank,0);
263 Int32UniqueArray recv_counts(nb_rank,0);
264 Int32UniqueArray recv_indexes(nb_rank,0);
265
266 // D'abord, détermine pour chaque proc le nombre d'octets à envoyer
267 for( SerializeMessage* comm : m_send_serialize_infos ){
268 auto* sbuf = comm->trueSerializer();
269 Span<Byte> val_buf = sbuf->globalBuffer();
270 Int32 rank = comm->destRank();
271 send_counts[rank] = arcaneCheckArraySize(val_buf.size());
272 }
273
274 // Fait un AllToAll pour connaitre combien de valeurs je dois recevoir des autres.
275 {
276 Timer::SimplePrinter sp(traceMng(),"ParallelExchanger: sending sizes with AllToAll");
277 pm->allToAll(send_counts,recv_counts,1);
278 }
279
280 // Détermine le nombre total d'infos à envoyer et recevoir
281
282 // TODO: En cas débordement, il faudrait le faire en plusieurs morceaux
283 // ou alors revenir aux échanges point à point.
284 Int32 total_send = 0;
285 Int32 total_recv = 0;
286 Int64 int64_total_send = 0;
287 Int64 int64_total_recv = 0;
288 for( Integer i=0; i<nb_rank; ++i ){
289 send_indexes[i] = total_send;
290 recv_indexes[i] = total_recv;
291 total_send += send_counts[i];
292 total_recv += recv_counts[i];
293 int64_total_send += send_counts[i];
294 int64_total_recv += recv_counts[i];
295 }
296
297 // Vérifie qu'on ne déborde pas.
298 if (int64_total_send!=total_send)
299 ARCANE_FATAL("Message to send is too big size={0} max=2^31",int64_total_send);
300 if (int64_total_recv!=total_recv)
301 ARCANE_FATAL("Message to receive is too big size={0} max=2^31",int64_total_recv);
302
303 ByteUniqueArray send_buf(total_send);
304 ByteUniqueArray recv_buf(total_recv);
305 bool is_verbose = (m_verbosity_level>=1);
306 if (m_verbosity_level>=2){
307 for( Integer i=0; i<nb_rank; ++i ){
308 if (send_counts[i]!=0 || recv_counts[i]!=0)
309 info() << "INFOS: rank=" << i << " send_count=" << send_counts[i]
310 << " send_idx=" << send_indexes[i]
311 << " recv_count=" << recv_counts[i]
312 << " recv_idx=" << recv_indexes[i];
313 }
314 }
315
316 // Copie dans send_buf les infos des sérialisers.
317 for( SerializeMessage* comm : m_send_serialize_infos ){
318 auto* sbuf = comm->trueSerializer();
319 Span<Byte> val_buf = sbuf->globalBuffer();
320 Int32 rank = comm->destRank();
321 if (is_verbose)
322 info() << "SEND rank=" << rank << " size=" << send_counts[rank]
323 << " idx=" << send_indexes[rank]
324 << " buf_size=" << val_buf.size();
325 ByteArrayView dest_buf(send_counts[rank],&send_buf[send_indexes[rank]]);
326 dest_buf.copy(val_buf);
327 }
328
329 if (is_verbose)
330 info() << "AllToAllVariable total_send=" << total_send
331 << " total_recv=" << total_recv;
332
333 {
334 Timer::SimplePrinter sp(traceMng(),"ParallelExchanger: sending values with AllToAll");
335 pm->allToAllVariable(send_buf,send_counts,send_indexes,recv_buf,recv_counts,recv_indexes);
336 }
337 // Recopie les données reçues dans le message correspondant.
338 for( SerializeMessage* comm : m_recv_serialize_infos ){
339 auto* sbuf = comm->trueSerializer();
340 Int32 rank = comm->destRank();
341 if (is_verbose)
342 info() << "RECV rank=" << rank << " size=" << recv_counts[rank]
343 << " idx=" << recv_indexes[rank];
344 ByteArrayView orig_buf(recv_counts[rank],&recv_buf[recv_indexes[rank]]);
345
346 sbuf->preallocate(orig_buf.size());
347 sbuf->globalBuffer().copy(orig_buf);
348 sbuf->setFromSizes();
349 }
350}
351
352/*---------------------------------------------------------------------------*/
353/*---------------------------------------------------------------------------*/
354
355ISerializeMessage* ParallelExchanger::
356messageToSend(Integer i)
357{
358 return m_send_serialize_infos[i];
359}
360
361/*---------------------------------------------------------------------------*/
362/*---------------------------------------------------------------------------*/
363
364ISerializeMessage* ParallelExchanger::
365messageToReceive(Integer i)
366{
367 return m_recv_serialize_infos[i];
368}
369
370/*---------------------------------------------------------------------------*/
371/*---------------------------------------------------------------------------*/
372
373void ParallelExchanger::
374setVerbosityLevel(Int32 v)
375{
376 if (v<0)
377 v = 0;
379}
380
381/*---------------------------------------------------------------------------*/
382/*---------------------------------------------------------------------------*/
383
384void ParallelExchanger::
385setName(const String& name)
386{
387 m_name = name;
388}
389
390/*---------------------------------------------------------------------------*/
391/*---------------------------------------------------------------------------*/
392
393namespace
394{
395class SortFunctor
396{
397 public:
408 bool operator()(const ISerializeMessage* a,const ISerializeMessage* b)
409 {
410 const int nb_phase = 4;
411 int phase1 = a->destination().value() % nb_phase;
412 int phase2 = b->destination().value() % nb_phase;
413 if (phase1 != phase2)
414 return phase1<phase2;
415 if (a->destination() != b->destination())
416 return a->destination() < b->destination();
417 if (a->isSend() != b->isSend())
418 return (a->isSend() ? false : true);
419 return a->source() < b->source();
420 }
421};
422}
423
424/*---------------------------------------------------------------------------*/
425/*---------------------------------------------------------------------------*/
429void ParallelExchanger::
430_processExchangeWithControl(Int32 max_pending_message)
431{
432 // L'ensemble des messages sont dans 'm_comms_buf'.
433 // On les recopie dans 'sorted_messages' pour qu'ils soient triés.
434
435 auto message_list {m_parallel_mng->createSerializeMessageListRef()};
436
438 std::sort(sorted_messages.begin(),sorted_messages.end(),SortFunctor{});
439
440 Integer position = 0;
441 // Il faut au moins ajouter un minimum de messages pour ne pas avoir de blocage.
442 // A priori le minimum est 2 pour qu'il y est au moins un receive et un send
443 // mais il est préférable de mettre plus pour ne pas trop dégrader les performances.
444 max_pending_message = math::max(4,max_pending_message);
445
446 Integer nb_message = sorted_messages.size();
447 Integer nb_to_add = max_pending_message;
448
449 Int32 verbosity_level = m_verbosity_level;
450
451 if (verbosity_level>=1)
452 info() << "ParallelExchanger " << m_name << " : process exchange WITH CONTROL"
453 << " nb_message=" << nb_message << " max_pending=" << max_pending_message;
454
455 while(position<nb_message){
456 for( Integer i=0; i<nb_to_add; ++i ){
457 if (position>=nb_message)
458 break;
459 ISerializeMessage* message = sorted_messages[position];
460 if (verbosity_level>=2)
461 info() << "Add Message p=" << position << " is_send?=" << message->isSend() << " source=" << message->source()
462 << " dest=" << message->destination();
463 message_list->addMessage(message);
464 ++position;
465 }
466 // S'il ne reste plus de messages, alors on fait un WaitAll pour attendre*
467 // que les messages restants soient tous terminés.
468 if (position>=nb_message){
469 message_list->waitMessages(Parallel::WaitAll);
470 break;
471 }
472 // Le nombre de messages terminés indique combien de message il faudra
473 // ajouter à la liste pour la prochaine itération.
474 Integer nb_done = message_list->waitMessages(Parallel::WaitSome);
475 if (verbosity_level>=2)
476 info() << "Wait nb_done=" << nb_done;
477 if (nb_done==(-1))
478 nb_done = max_pending_message;
479 nb_to_add = nb_done;
480 }
481}
482
483/*---------------------------------------------------------------------------*/
484/*---------------------------------------------------------------------------*/
485
487createParallelExchangerImpl(Ref<IParallelMng> pm)
488{
490}
491
492/*---------------------------------------------------------------------------*/
493/*---------------------------------------------------------------------------*/
494
495} // End namespace Arcane
496
497/*---------------------------------------------------------------------------*/
498/*---------------------------------------------------------------------------*/
499
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Fonctions mathématiques diverses.
Integer size() const
Nombre d'éléments du vecteur.
iterator end()
Itérateur sur le premier élément après la fin du tableau.
iterator begin()
Itérateur sur le premier élément du tableau.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
Interface du gestionnaire de parallélisme pour un sous-domaine.
Interface d'un message de sérialisation entre IMessagePassingMng.
virtual bool isSend() const =0
true s'il faut envoyer, false s'il faut recevoir
virtual MessageRank destination() const =0
Rang du destinataire (si isSend() est vrai) ou de l'envoyeur.
virtual MessageRank source() const =0
Rang de l'envoyeur du message.
Int32 value() const
Valeur du rang.
Definition MessageRank.h:72
Options pour IParallelMng::processExchange().
Int32 maxPendingMessage() const
Nombre maximal de messages en vol.
void setExchangeMode(eExchangeMode mode)
Positionne le mode d'échange.
eExchangeMode exchangeMode() const
Mode d'échange spécifié
Echange d'informations entre processeurs.
Int32UniqueArray m_send_ranks
Liste des sous-domaines à envoyer.
String m_name
Nom de l'instance utilisé pour l'affichage.
UniqueArray< SerializeMessage * > m_recv_serialize_infos
Liste des message à recevoir.
void processExchange() override
Effectue l'échange avec les options par défaut de ParallelExchangerOptions.
Int32 m_verbosity_level
Niveau de verbosité
Timer m_timer
Timer pour mesurer le temps passé dans les échanges.
String name() const override
Nom de l'instance.
UniqueArray< SerializeMessage * > m_send_serialize_infos
Liste des message à recevoir.
Int32UniqueArray m_recv_ranks
Liste des sous-domaines à recevoir.
eExchangeMode m_exchange_mode
Mode d'échange.
UniqueArray< ISerializeMessage * > m_comms_buf
Liste des message à envoyer et recevoir.
Référence à une instance.
Message utilisant un SerializeBuffer.
Chaîne de caractères unicode.
Sentinelle pour le timer. La sentinelle associée à un timer permet de déclancher celui-ci au moment d...
Definition Timer.h:89
TraceMessage info() const
Flot pour un message d'information.
Vecteur 1D de données avec sémantique par valeur (style STL).
T max(const T &a, const T &b, const T &c)
Retourne le maximum de trois éléments.
Definition MathUtils.h:392
@ WaitSome
Attend que tous les messages de la liste soient traités.
ARCCORE_BASE_EXPORT String getCurrentDateTime()
Date et l'heure courante sous la forme ISO 8601.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
Integer arcaneCheckArraySize(unsigned long long size)
Vérifie que size peut être converti dans un 'Integer' pour servir de taille à un tableau....
ArrayView< Byte > ByteArrayView
Equivalent C d'un tableau à une dimension de caractères.
Definition UtilsTypes.h:534
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:569
UniqueArray< Byte > ByteUniqueArray
Tableau dynamique à une dimension de caractères.
Definition UtilsTypes.h:422
UniqueArray< Int32 > Int32UniqueArray
Tableau dynamique à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:428
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
UniqueArray< Integer > IntegerUniqueArray
Tableau dynamique à une dimension d'entiers.
Definition UtilsTypes.h:434
std::int32_t Int32
Type entier signé sur 32 bits.