Arcane  v3.14.10.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
ParallelExchanger.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2022 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* ParallelExchanger.cc (C) 2000-2022 */
9/* */
10/* Echange d'informations entre processeurs. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/impl/ParallelExchanger.h"
15
16#include "arcane/utils/NotSupportedException.h"
17#include "arcane/utils/FatalErrorException.h"
18#include "arcane/utils/PlatformUtils.h"
19
20#include "arcane/MathUtils.h"
21#include "arcane/IParallelMng.h"
22#include "arcane/SerializeBuffer.h"
23#include "arcane/SerializeMessage.h"
24#include "arcane/Timer.h"
25#include "arcane/ISerializeMessageList.h"
26
27#include <algorithm>
28
29/*---------------------------------------------------------------------------*/
30/*---------------------------------------------------------------------------*/
31
32namespace Arcane
33{
34
35/*---------------------------------------------------------------------------*/
36/*---------------------------------------------------------------------------*/
37
38ParallelExchanger::
39ParallelExchanger(IParallelMng* pm)
40: ParallelExchanger(makeRef(pm))
41{
42}
43
44/*---------------------------------------------------------------------------*/
45/*---------------------------------------------------------------------------*/
46
47ParallelExchanger::
48ParallelExchanger(Ref<IParallelMng> pm)
49: TraceAccessor(pm->traceMng())
50, m_parallel_mng(pm)
51, m_timer(pm->timerMng(),"ParallelExchangerTimer",Timer::TimerReal)
52{
53 String use_collective_str = platform::getEnvironmentVariable("ARCANE_PARALLEL_EXCHANGER_USE_COLLECTIVE");
54 if (use_collective_str=="1" || use_collective_str=="TRUE")
55 m_exchange_mode = EM_Collective;
56}
57
58/*---------------------------------------------------------------------------*/
59/*---------------------------------------------------------------------------*/
60
61ParallelExchanger::
62~ParallelExchanger()
63{
64 for( auto* buf : m_comms_buf )
65 delete buf;
66 m_comms_buf.clear();
67 delete m_own_send_message;
68 delete m_own_recv_message;
69}
70
71/*---------------------------------------------------------------------------*/
72/*---------------------------------------------------------------------------*/
73
74IParallelMng* ParallelExchanger::
75parallelMng() const
76{
77 return m_parallel_mng.get();
78}
79
80/*---------------------------------------------------------------------------*/
81/*---------------------------------------------------------------------------*/
82
83bool ParallelExchanger::
84initializeCommunicationsMessages()
85{
86 Int32 nb_send_rank = m_send_ranks.size();
89 std::copy(std::begin(m_send_ranks),std::end(m_send_ranks),
90 std::begin(gather_input_send_ranks)+1);
91
93 Integer nb_rank = m_parallel_mng->commSize();
94 m_parallel_mng->allGatherVariable(gather_input_send_ranks,
96
97 m_recv_ranks.clear();
98 Integer total_comm_rank = 0;
99 Int32 my_rank = m_parallel_mng->commRank();
100 {
101 Integer gather_index = 0;
102 for( Integer i=0; i<nb_rank; ++i ){
105 ++gather_index;
106 for( Integer z=0; z<nb_comm; ++z ){
109 m_recv_ranks.add(i);
110 }
112 }
113 }
114
115 if (total_comm_rank==0)
116 return true;
117
118 _initializeCommunicationsMessages();
119
120 return false;
121}
122
123/*---------------------------------------------------------------------------*/
124/*---------------------------------------------------------------------------*/
125
126void ParallelExchanger::
127initializeCommunicationsMessages(Int32ConstArrayView recv_ranks)
128{
129 m_recv_ranks.resize(recv_ranks.size());
130 m_recv_ranks.copy(recv_ranks);
131 _initializeCommunicationsMessages();
132}
133
134/*---------------------------------------------------------------------------*/
135/*---------------------------------------------------------------------------*/
136
137void ParallelExchanger::
138_initializeCommunicationsMessages()
139{
140 if (m_verbosity_level>=1){
141 info() << "ParallelExchanger " << m_name << " : nb_send=" << m_send_ranks.size()
142 << " nb_recv=" << m_recv_ranks.size();
143 if (m_verbosity_level>=2){
144 info() << "ParallelExchanger " << m_name << " : send=" << m_send_ranks;
145 info() << "ParallelExchanger " << m_name << " : recv=" << m_recv_ranks;
146 }
147 }
148
149 Int32 my_rank = m_parallel_mng->commRank();
150
151 for( Int32 msg_rank : m_send_ranks ){
152 auto* comm = new SerializeMessage(my_rank,msg_rank,ISerializeMessage::MT_Send);
153 // Il ne sert à rien de s'envoyer des messages.
154 // (En plus ca fait planter certaines versions de MPI...)
155 if (my_rank==msg_rank)
156 m_own_send_message = comm;
157 else
158 m_comms_buf.add(comm);
159 m_send_serialize_infos.add(comm);
160 }
161}
162
163/*---------------------------------------------------------------------------*/
164/*---------------------------------------------------------------------------*/
165
166void ParallelExchanger::
167processExchange()
168{
170 options.setExchangeMode(static_cast<ParallelExchangerOptions::eExchangeMode>(m_exchange_mode));
171 processExchange(options);
172}
173
174/*---------------------------------------------------------------------------*/
175/*---------------------------------------------------------------------------*/
176
177void ParallelExchanger::
178processExchange(const ParallelExchangerOptions& options)
179{
180 if (m_verbosity_level>=1)
181 info() << "ParallelExchanger " << m_name << ": ProcessExchange (begin)"
182 << " date=" << platform::getCurrentDateTime();
183
184 {
185 Timer::Sentry sentry(&m_timer);
186 _processExchange(options);
187 }
188
189 if (m_verbosity_level>=1)
190 info() << "ParallelExchanger " << m_name << ": ProcessExchange (end)"
191 << " total_time=" << m_timer.lastActivationTime();
192}
193
194/*---------------------------------------------------------------------------*/
195/*---------------------------------------------------------------------------*/
196
197void ParallelExchanger::
198_processExchange(const ParallelExchangerOptions& options)
199{
200 if (m_verbosity_level>=1){
201 Int64 total_size = 0;
202 for( SerializeMessage* comm : m_send_serialize_infos ){
203 Int64 message_size = comm->trueSerializer()->totalSize();
205 if (m_verbosity_level>=2)
206 info() << "Send rank=" << comm->destination() << " size=" << message_size;
207 }
208 info() << "ParallelExchanger " << m_name << ": ProcessExchange"
209 << " total_size=" << total_size << " nb_message=" << m_comms_buf.size();
210 }
211
212 bool use_all_to_all = false;
213 if (options.exchangeMode())
214 use_all_to_all = true;
215 // TODO: traiter le cas EM_Auto
216
217 // Génère les infos pour chaque processeur de qui on va recevoir
218 // des entités
219 Int32 my_rank = m_parallel_mng->commRank();
220 for( Int32 msg_rank : m_recv_ranks ){
221 auto* comm = new SerializeMessage(my_rank,msg_rank,ISerializeMessage::MT_Recv);
222 // Il ne sert à rien de s'envoyer des messages.
223 // (En plus ca fait planter certaines versions de MPI...)
224 if (my_rank==msg_rank)
225 m_own_recv_message = comm;
226 else
227 m_comms_buf.add(comm);
228 m_recv_serialize_infos.add(comm);
229 }
230
231 if (use_all_to_all)
232 _processExchangeCollective();
233 else{
234 Int32 max_pending = options.maxPendingMessage();
235 if (max_pending>0)
236 _processExchangeWithControl(max_pending);
237 else
238 m_parallel_mng->processMessages(m_comms_buf);
239
240 if (m_own_send_message && m_own_recv_message){
241 m_own_recv_message->serializer()->copy(m_own_send_message->serializer());
242 }
243 }
244
245 // Récupère les infos de chaque receveur
246 for( SerializeMessage* comm : m_recv_serialize_infos )
247 comm->serializer()->setMode(ISerializer::ModeGet);
248}
249
250/*---------------------------------------------------------------------------*/
251/*---------------------------------------------------------------------------*/
252
253void ParallelExchanger::
254_processExchangeCollective()
255{
256 info() << "Using collective exchange in ParallelExchanger";
257
258 IParallelMng* pm = m_parallel_mng.get();
259 Int32 nb_rank = pm->commSize();
260
261 Int32UniqueArray send_counts(nb_rank,0);
262 Int32UniqueArray send_indexes(nb_rank,0);
263 Int32UniqueArray recv_counts(nb_rank,0);
264 Int32UniqueArray recv_indexes(nb_rank,0);
265
266 // D'abord, détermine pour chaque proc le nombre d'octets à envoyer
267 for( SerializeMessage* comm : m_send_serialize_infos ){
268 auto* sbuf = comm->trueSerializer();
269 Span<Byte> val_buf = sbuf->globalBuffer();
270 Int32 rank = comm->destRank();
271 send_counts[rank] = arcaneCheckArraySize(val_buf.size());
272 }
273
274 // Fait un AllToAll pour connaitre combien de valeurs je dois recevoir des autres.
275 {
276 Timer::SimplePrinter sp(traceMng(),"ParallelExchanger: sending sizes with AllToAll");
277 pm->allToAll(send_counts,recv_counts,1);
278 }
279
280 // Détermine le nombre total d'infos à envoyer et recevoir
281
282 // TODO: En cas débordement, il faudrait le faire en plusieurs morceaux
283 // ou alors revenir aux échanges point à point.
284 Int32 total_send = 0;
285 Int32 total_recv = 0;
286 Int64 int64_total_send = 0;
287 Int64 int64_total_recv = 0;
288 for( Integer i=0; i<nb_rank; ++i ){
289 send_indexes[i] = total_send;
290 recv_indexes[i] = total_recv;
291 total_send += send_counts[i];
292 total_recv += recv_counts[i];
293 int64_total_send += send_counts[i];
294 int64_total_recv += recv_counts[i];
295 }
296
297 // Vérifie qu'on ne déborde pas.
298 if (int64_total_send!=total_send)
299 ARCANE_FATAL("Message to send is too big size={0} max=2^31",int64_total_send);
300 if (int64_total_recv!=total_recv)
301 ARCANE_FATAL("Message to receive is too big size={0} max=2^31",int64_total_recv);
302
303 ByteUniqueArray send_buf(total_send);
304 ByteUniqueArray recv_buf(total_recv);
305 bool is_verbose = (m_verbosity_level>=1);
306 if (m_verbosity_level>=2){
307 for( Integer i=0; i<nb_rank; ++i ){
308 if (send_counts[i]!=0 || recv_counts[i]!=0)
309 info() << "INFOS: rank=" << i << " send_count=" << send_counts[i]
310 << " send_idx=" << send_indexes[i]
311 << " recv_count=" << recv_counts[i]
312 << " recv_idx=" << recv_indexes[i];
313 }
314 }
315
316 // Copie dans send_buf les infos des sérialisers.
317 for( SerializeMessage* comm : m_send_serialize_infos ){
318 auto* sbuf = comm->trueSerializer();
319 Span<Byte> val_buf = sbuf->globalBuffer();
320 Int32 rank = comm->destRank();
321 if (is_verbose)
322 info() << "SEND rank=" << rank << " size=" << send_counts[rank]
323 << " idx=" << send_indexes[rank]
324 << " buf_size=" << val_buf.size();
325 ByteArrayView dest_buf(send_counts[rank],&send_buf[send_indexes[rank]]);
326 dest_buf.copy(val_buf);
327 }
328
329 if (is_verbose)
330 info() << "AllToAllVariable total_send=" << total_send
331 << " total_recv=" << total_recv;
332
333 {
334 Timer::SimplePrinter sp(traceMng(),"ParallelExchanger: sending values with AllToAll");
335 pm->allToAllVariable(send_buf,send_counts,send_indexes,recv_buf,recv_counts,recv_indexes);
336 }
337 // Recopie les données reçues dans le message correspondant.
338 for( SerializeMessage* comm : m_recv_serialize_infos ){
339 auto* sbuf = comm->trueSerializer();
340 Int32 rank = comm->destRank();
341 if (is_verbose)
342 info() << "RECV rank=" << rank << " size=" << recv_counts[rank]
343 << " idx=" << recv_indexes[rank];
344 ByteArrayView orig_buf(recv_counts[rank],&recv_buf[recv_indexes[rank]]);
345
346 sbuf->preallocate(orig_buf.size());
347 sbuf->globalBuffer().copy(orig_buf);
348 sbuf->setFromSizes();
349 }
350}
351
352/*---------------------------------------------------------------------------*/
353/*---------------------------------------------------------------------------*/
354
355ISerializeMessage* ParallelExchanger::
356messageToSend(Integer i)
357{
358 return m_send_serialize_infos[i];
359}
360
361/*---------------------------------------------------------------------------*/
362/*---------------------------------------------------------------------------*/
363
364ISerializeMessage* ParallelExchanger::
365messageToReceive(Integer i)
366{
367 return m_recv_serialize_infos[i];
368}
369
370/*---------------------------------------------------------------------------*/
371/*---------------------------------------------------------------------------*/
372
373void ParallelExchanger::
374setVerbosityLevel(Int32 v)
375{
376 if (v<0)
377 v = 0;
378 m_verbosity_level = v;
379}
380
381/*---------------------------------------------------------------------------*/
382/*---------------------------------------------------------------------------*/
383
384void ParallelExchanger::
385setName(const String& name)
386{
387 m_name = name;
388}
389
390/*---------------------------------------------------------------------------*/
391/*---------------------------------------------------------------------------*/
392
393namespace
394{
395class SortFunctor
396{
397 public:
408 bool operator()(const ISerializeMessage* a,const ISerializeMessage* b)
409 {
410 const int nb_phase = 4;
411 int phase1 = a->destination().value() % nb_phase;
412 int phase2 = b->destination().value() % nb_phase;
413 if (phase1 != phase2)
414 return phase1<phase2;
415 if (a->destination() != b->destination())
416 return a->destination() < b->destination();
417 if (a->isSend() != b->isSend())
418 return (a->isSend() ? false : true);
419 return a->source() < b->source();
420 }
421};
422}
423
424/*---------------------------------------------------------------------------*/
425/*---------------------------------------------------------------------------*/
429void ParallelExchanger::
430_processExchangeWithControl(Int32 max_pending_message)
431{
432 // L'ensemble des messages sont dans 'm_comms_buf'.
433 // On les recopie dans 'sorted_messages' pour qu'ils soient triés.
434
435 auto message_list {m_parallel_mng->createSerializeMessageListRef()};
436
438 std::sort(sorted_messages.begin(),sorted_messages.end(),SortFunctor{});
439
440 Integer position = 0;
441 // Il faut au moins ajouter un minimum de messages pour ne pas avoir de blocage.
442 // A priori le minimum est 2 pour qu'il y est au moins un receive et un send
443 // mais il est préférable de mettre plus pour ne pas trop dégrader les performances.
445
446 Integer nb_message = sorted_messages.size();
448
449 Int32 verbosity_level = m_verbosity_level;
450
451 if (verbosity_level>=1)
452 info() << "ParallelExchanger " << m_name << " : process exchange WITH CONTROL"
453 << " nb_message=" << nb_message << " max_pending=" << max_pending_message;
454
455 while(position<nb_message){
456 for( Integer i=0; i<nb_to_add; ++i ){
457 if (position>=nb_message)
458 break;
459 ISerializeMessage* message = sorted_messages[position];
460 if (verbosity_level>=2)
461 info() << "Add Message p=" << position << " is_send?=" << message->isSend() << " source=" << message->source()
462 << " dest=" << message->destination();
463 message_list->addMessage(message);
464 ++position;
465 }
466 // S'il ne reste plus de messages, alors on fait un WaitAll pour attendre*
467 // que les messages restants soient tous terminés.
468 if (position>=nb_message){
469 message_list->waitMessages(Parallel::WaitAll);
470 break;
471 }
472 // Le nombre de messages terminés indique combien de message il faudra
473 // ajouter à la liste pour la prochaine itération.
474 Integer nb_done = message_list->waitMessages(Parallel::WaitSome);
475 if (verbosity_level>=2)
476 info() << "Wait nb_done=" << nb_done;
477 if (nb_done==(-1))
480 }
481}
482
483/*---------------------------------------------------------------------------*/
484/*---------------------------------------------------------------------------*/
485
487createParallelExchangerImpl(Ref<IParallelMng> pm)
488{
490}
491
492/*---------------------------------------------------------------------------*/
493/*---------------------------------------------------------------------------*/
494
495} // End namespace Arcane
496
497/*---------------------------------------------------------------------------*/
498/*---------------------------------------------------------------------------*/
499
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
virtual Int32 commSize() const =0
Nombre d'instance dans le communicateur.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Definition Lima.cc:120
Options pour IParallelMng::processExchange().
Int32 maxPendingMessage() const
Nombre maximal de messages en vol.
void setExchangeMode(eExchangeMode mode)
Positionne le mode d'échange.
eExchangeMode exchangeMode() const
Mode d'échange spécifié
Echange d'informations entre processeurs.
Message utilisant un SerializeBuffer.
Sentinelle pour le timer. La sentinelle associée à un timer permet de déclancher celui-ci au moment d...
Definition Timer.h:89
Vue constante d'un tableau de type T.
virtual MessageRank destination() const =0
Rang du destinataire (si isSend() est vrai) ou de l'envoyeur.
virtual bool isSend() const =0
true s'il faut envoyer, false s'il faut recevoir
virtual MessageRank source() const =0
Rang de l'envoyeur du message.
Chaîne de caractères unicode.
Vecteur 1D de données avec sémantique par valeur (style STL).
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
Integer arcaneCheckArraySize(unsigned long long size)
Vérifie que size peut être converti dans un 'Integer' pour servir de taille à un tableau....
UniqueArray< Byte > ByteUniqueArray
Tableau dynamique à une dimension de caractères.
Definition UtilsTypes.h:509
UniqueArray< Int32 > Int32UniqueArray
Tableau dynamique à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:515
ArrayView< Byte > ByteArrayView
Equivalent C d'un tableau à une dimension de caractères.
Definition UtilsTypes.h:605
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.