Arcane  v3.14.10.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
HybridParallelMng.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2024 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* HybridParallelMng.cc (C) 2000-2024 */
9/* */
10/* Gestionnaire de parallélisme utilisant un mixte MPI/Threads. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/parallel/mpithread/HybridParallelMng.h"
15
16#include "arcane/utils/NotImplementedException.h"
17#include "arcane/utils/NotSupportedException.h"
18#include "arcane/utils/FatalErrorException.h"
19#include "arcane/utils/NumericTypes.h"
20#include "arcane/utils/ArgumentException.h"
21#include "arcane/utils/IThreadBarrier.h"
22#include "arcane/utils/ITraceMng.h"
23
24#include "arcane/core/parallel/IStat.h"
25
26#include "arcane/parallel/mpithread/HybridParallelDispatch.h"
27#include "arcane/parallel/mpithread/HybridMessageQueue.h"
28#include "arcane/parallel/mpi/MpiParallelMng.h"
29
30#include "arcane/core/SerializeMessage.h"
31#include "arcane/core/IIOMng.h"
32#include "arcane/core/Timer.h"
33#include "arcane/core/ISerializeMessageList.h"
34#include "arcane/core/IItemFamily.h"
35#include "arcane/core/internal/IParallelMngInternal.h"
36
37#include "arcane/impl/TimerMng.h"
38#include "arcane/impl/ParallelReplication.h"
39#include "arcane/impl/SequentialParallelMng.h"
40#include "arcane/impl/ParallelMngUtilsFactoryBase.h"
41
43#include "arccore/message_passing/RequestListBase.h"
44#include "arccore/message_passing/SerializeMessageList.h"
45
46/*---------------------------------------------------------------------------*/
47/*---------------------------------------------------------------------------*/
48
49namespace Arcane
50{
51extern "C++" IIOMng*
52arcaneCreateIOMng(IParallelMng* psm);
53}
54
56{
57
58/*---------------------------------------------------------------------------*/
59/*---------------------------------------------------------------------------*/
60
61// NOTE: Cette classe n'est plus utilisée. Elle reste pour référence
62// et sera supprimée ultérieurement
65{
66 public:
68 {
69 public:
71 : m_message(message), m_request(request){}
72 public:
73 ISerializeMessage* m_message = nullptr;
74 Request m_request;
75 };
76
77 public:
78
80 : m_parallel_mng(mpm), m_trace(mpm->traceMng())
81 {
82 }
83
84 public:
85
87 {
88 m_messages_to_process.add(msg);
89
90 }
91 void processPendingMessages() override
92 {
93 }
94
96 {
97 switch(wait_type){
98 case Parallel::WaitAll:
99 // Pour l'instant seul le mode bloquant est supporté.
100 //m_parallel_mng->processMessages(m_messages_to_process);
101 _wait(Parallel::WaitAll);
102 m_messages_to_process.clear();
103 return (-1);
104 case Parallel::WaitSome:
106 case Parallel::WaitSomeNonBlocking:
107 ARCANE_THROW(NotImplementedException,"WaitSomeNonBlocking");
108 }
109 return (-1);
110 }
111
112 private:
113
114 HybridParallelMng* m_parallel_mng;
115 ITraceMng* m_trace;
116 UniqueArray<ISerializeMessage*> m_messages_to_process;
117
118 void _waitAll();
119 void _wait(Parallel::eWaitType wait_mode);
120};
121
122/*---------------------------------------------------------------------------*/
123/*---------------------------------------------------------------------------*/
124
125void HybridSerializeMessageList::
127{
128 m_trace->info() << "BEGIN PROCESS MESSAGES";
129
130 // TODO: gérer la memoire sans faire de new.
131 ConstArrayView<ISerializeMessage*> messages = m_messages_to_process;
132 HybridMessageQueue* message_queue = m_parallel_mng->m_message_queue;
135 for( ISerializeMessage* sm : messages ){
136 ISerializer* s = sm->serializer();
137 MessageRank orig(sm->source());
138 MessageRank dest(sm->destination());
139 PointToPointMessageInfo message_info(orig,dest,HYBRID_MESSAGE_TAG,Parallel::NonBlocking);
140 Request r;
141 if (sm->isSend())
142 r = message_queue->addSend(message_info,SendBufferInfo(s));
143 else
144 r = message_queue->addReceive(message_info,ReceiveBufferInfo(s));
145 all_requests.add(r);
146 }
147
148 if (wait_mode==Parallel::WaitAll)
149 message_queue->waitAll(all_requests);
150
151 for( ISerializeMessage* sm : messages )
152 sm->setFinished(true);
153}
154
155/*---------------------------------------------------------------------------*/
156/*---------------------------------------------------------------------------*/
157
158/*---------------------------------------------------------------------------*/
159/*---------------------------------------------------------------------------*/
160
161HybridParallelMng::
162HybridParallelMng(const HybridParallelMngBuildInfo& bi)
163: ParallelMngDispatcher(ParallelMngDispatcherBuildInfo(bi.local_rank,bi.local_nb_rank))
164, m_trace(bi.trace_mng)
165, m_thread_mng(bi.thread_mng)
166, m_world_parallel_mng(bi.world_parallel_mng)
167, m_io_mng(nullptr)
168, m_timer_mng(nullptr)
169, m_replication(new ParallelReplication())
170, m_message_queue(new HybridMessageQueue(bi.message_queue,bi.mpi_parallel_mng,bi.local_nb_rank))
171, m_is_initialized(false)
172, m_stat(Parallel::createDefaultStat())
173, m_thread_barrier(bi.thread_barrier)
174, m_mpi_parallel_mng(bi.mpi_parallel_mng)
175, m_all_dispatchers(bi.all_dispatchers)
176, m_sub_builder_factory(bi.sub_builder_factory)
177, m_parent_container_ref(bi.container)
178, m_utils_factory(createRef<ParallelMngUtilsFactoryBase>())
179{
180 if (!m_world_parallel_mng)
181 m_world_parallel_mng = this;
182
183 // TODO: vérifier que tous les autres HybridParallelMng ont bien
184 // le même nombre de rang locaux (m_local_nb_rank)
185 m_local_rank = bi.local_rank;
186 m_local_nb_rank = bi.local_nb_rank;
187
188 Int32 mpi_rank = m_mpi_parallel_mng->commRank();
189 Int32 mpi_size = m_mpi_parallel_mng->commSize();
190
191 m_global_rank = m_local_rank + mpi_rank * m_local_nb_rank;
192 m_global_nb_rank = mpi_size * m_local_nb_rank;
193
194 m_is_parallel = m_global_nb_rank!=1;
195}
196
197/*---------------------------------------------------------------------------*/
198/*---------------------------------------------------------------------------*/
199
200HybridParallelMng::
201~HybridParallelMng()
202{
203 m_sequential_parallel_mng.reset();
204 delete m_replication;
205 delete m_io_mng;
206 delete m_message_queue;
207 delete m_timer_mng;
208 delete m_stat;
209 delete m_mpi_parallel_mng;
210}
211
212/*---------------------------------------------------------------------------*/
213/*---------------------------------------------------------------------------*/
214
215namespace
216{
217// Classe pour créer les différents dispatchers
218class DispatchCreator
219{
220 public:
221 DispatchCreator(ITraceMng* tm,HybridParallelMng* mpm,HybridMessageQueue* message_queue,MpiThreadAllDispatcher* all_dispatchers)
222 : m_tm(tm), m_mpm(mpm), m_message_queue(message_queue), m_all_dispatchers(all_dispatchers){}
223 public:
224 template<typename DataType> HybridParallelDispatch<DataType>*
225 create()
226 {
227 HybridMessageQueue* tmq = m_message_queue;
228 MpiThreadAllDispatcher* ad = m_all_dispatchers;
229 auto field = ad->instance((DataType*)nullptr).view();
230 return new HybridParallelDispatch<DataType>(m_tm,m_mpm,tmq,field);
231 }
232
233 ITraceMng* m_tm;
234 HybridParallelMng* m_mpm;
235 HybridMessageQueue* m_message_queue;
236 MpiThreadAllDispatcher* m_all_dispatchers;
237};
238}
239
240/*---------------------------------------------------------------------------*/
241/*---------------------------------------------------------------------------*/
242
244build()
245{
246 ITraceMng* tm = traceMng();
247 tm->info() << "Initialise HybridParallelMng"
248 << " global_rank=" << m_global_rank
249 << " local_rank=" << m_local_rank
250 << " mpi_rank=" << m_mpi_parallel_mng->commRank();
251
252 m_timer_mng = new TimerMng(tm);
253
254 // Créé le gestionnaire séquentiel associé.
255 {
257 bi.setTraceMng(traceMng());
258 bi.setCommunicator(communicator());
259 bi.setThreadMng(threadMng());
260 m_sequential_parallel_mng = arcaneCreateSequentialParallelMngRef(bi);
261 }
262
263 DispatchCreator creator(m_trace,this,m_message_queue,m_all_dispatchers);
264 this->createDispatchers(creator);
265 m_io_mng = arcaneCreateIOMng(this);
266}
267
268/*---------------------------------------------------------------------------*/
269/*---------------------------------------------------------------------------*/
270
271/*----------------------------------------------------------------------------*/
272/*---------------------------------------------------------------------------*/
273
276{
277 Trace::Setter mci(m_trace,"Thread");
278 if (m_is_initialized){
279 m_trace->warning() << "HybridParallelMng already initialized";
280 return;
281 }
282
283 m_is_initialized = true;
284}
285
286/*---------------------------------------------------------------------------*/
287/*---------------------------------------------------------------------------*/
288
289SerializeBuffer* HybridParallelMng::
290_castSerializer(ISerializer* serializer)
291{
292 auto sbuf = dynamic_cast<SerializeBuffer*>(serializer);
293 if (!sbuf)
294 ARCANE_THROW(ArgumentException,"can not cast 'ISerializer' to 'SerializeBuffer'");
295 return sbuf;
296}
297
298/*---------------------------------------------------------------------------*/
299/*---------------------------------------------------------------------------*/
300
303{
304 return m_utils_factory->createGetVariablesValuesOperation(this)._release();
305}
306
309{
310 return m_utils_factory->createTransferValuesOperation(this)._release();
311}
312
315{
316 return m_utils_factory->createExchanger(this)._release();
317}
318
319/*---------------------------------------------------------------------------*/
320/*---------------------------------------------------------------------------*/
321
322/*---------------------------------------------------------------------------*/
323/*---------------------------------------------------------------------------*/
324
325void HybridParallelMng::
326sendSerializer(ISerializer* s,Int32 rank)
327{
328 auto p2p_message = buildMessage(rank,Parallel::NonBlocking);
329 Request r = m_message_queue->addSend(p2p_message,s);
330 m_message_queue->waitAll(ArrayView<Request>(1,&r));
331}
332
333/*---------------------------------------------------------------------------*/
334/*---------------------------------------------------------------------------*/
335
336auto HybridParallelMng::
337sendSerializer(ISerializer* s,Int32 rank,ByteArray& bytes) -> Request
338{
339 ARCANE_UNUSED(bytes);
340 auto p2p_message = buildMessage(rank,Parallel::NonBlocking);
341 return m_message_queue->addSend(p2p_message,s);
342}
343
344/*---------------------------------------------------------------------------*/
345/*---------------------------------------------------------------------------*/
346
348createSendSerializer(Int32 rank)
349{
350 return new SerializeMessage(m_global_rank,rank,ISerializeMessage::MT_Send);
351}
352
353/*---------------------------------------------------------------------------*/
354/*---------------------------------------------------------------------------*/
355
356void HybridParallelMng::
357broadcastSerializer(ISerializer* values,Int32 rank)
358{
359 Timer::Phase tphase(timeStats(),TP_Communication);
360 SerializeBuffer* sbuf = _castSerializer(values);
361
362 bool is_broadcaster = (rank==commRank());
363
364 // Effectue l'envoie en deux phases. Envoie d'abord le nombre d'éléments
365 // puis envoie les éléments.
366 // TODO: il serait possible de le faire en une fois pour les messages
367 // ne dépassant pas une certaine taille.
368
370 if (is_broadcaster){
371 Int64 total_size = sbuf->totalSize();
372 Span<Byte> bytes = sbuf->globalBuffer();
373 this->broadcast(Int64ArrayView(1,&total_size),rank);
374 mpBroadcast(mpm,bytes,rank);
375 }
376 else{
377 Int64 total_size = 0;
378 this->broadcast(Int64ArrayView(1,&total_size),rank);
379 sbuf->preallocate(total_size);
380 Span<Byte> bytes = sbuf->globalBuffer();
381 mpBroadcast(mpm,bytes,rank);
382 sbuf->setFromSizes();
383 }
384}
385
386/*---------------------------------------------------------------------------*/
387/*---------------------------------------------------------------------------*/
388
389void HybridParallelMng::
390recvSerializer(ISerializer* s,Int32 rank)
391{
392 auto p2p_message = buildMessage(rank,Parallel::NonBlocking);
393 Request r = m_message_queue->addReceive(p2p_message,ReceiveBufferInfo(s));
394 m_message_queue->waitAll(ArrayView<Request>(1,&r));
395}
396
397/*---------------------------------------------------------------------------*/
398/*---------------------------------------------------------------------------*/
399
401createReceiveSerializer(Int32 rank)
402{
403 return new SerializeMessage(m_global_rank,rank,ISerializeMessage::MT_Recv);
404}
405
406/*---------------------------------------------------------------------------*/
407/*---------------------------------------------------------------------------*/
408
411{
412 ARCANE_UNUSED(requests);
413 throw NotImplementedException(A_FUNCINFO);
414}
415
416/*---------------------------------------------------------------------------*/
417/*---------------------------------------------------------------------------*/
418
420probe(const PointToPointMessageInfo& message)
421{
423 p2p_message.setEmiterRank(MessageRank(m_global_rank));
424 return m_message_queue->probe(p2p_message);
425}
426
427/*---------------------------------------------------------------------------*/
428/*---------------------------------------------------------------------------*/
429
432{
434 p2p_message.setEmiterRank(MessageRank(m_global_rank));
435 return m_message_queue->legacyProbe(p2p_message);
436}
437
438/*---------------------------------------------------------------------------*/
439/*---------------------------------------------------------------------------*/
440
441Request HybridParallelMng::
442sendSerializer(const ISerializer* s,const PointToPointMessageInfo& message)
443{
444 auto p2p_message = buildMessage(message);
445 return m_message_queue->addSend(p2p_message,s);
446}
447
448/*---------------------------------------------------------------------------*/
449/*---------------------------------------------------------------------------*/
450
451Request HybridParallelMng::
452receiveSerializer(ISerializer* s,const PointToPointMessageInfo& message)
453{
454 auto p2p_message = buildMessage(message);
455 return m_message_queue->addReceive(p2p_message,ReceiveBufferInfo(s));
456}
457
458/*---------------------------------------------------------------------------*/
459/*---------------------------------------------------------------------------*/
460
463{
464 if (m_stat)
465 m_stat->print(m_trace);
466}
467
468/*---------------------------------------------------------------------------*/
469/*---------------------------------------------------------------------------*/
470
472barrier()
473{
474 m_thread_barrier->wait();
475 if (m_local_rank==0)
476 m_mpi_parallel_mng->barrier();
477 m_thread_barrier->wait();
478}
479
480/*---------------------------------------------------------------------------*/
481/*---------------------------------------------------------------------------*/
482
483ISerializeMessageList* HybridParallelMng::
484_createSerializeMessageList()
485{
486 auto* x = new MP::internal::SerializeMessageList(messagePassingMng());
487 x->setAllowAnyRankReceive(false);
488 return x;
489}
490
491/*---------------------------------------------------------------------------*/
492/*---------------------------------------------------------------------------*/
493
496{
497 return m_utils_factory->createSynchronizer(this,family)._release();
498}
499
500/*---------------------------------------------------------------------------*/
501/*---------------------------------------------------------------------------*/
502
504createSynchronizer(const ItemGroup& group)
505{
506 return m_utils_factory->createSynchronizer(this,group)._release();
507}
508
509/*---------------------------------------------------------------------------*/
510/*---------------------------------------------------------------------------*/
511
514{
515 return m_utils_factory->createTopology(this)._release();
516}
517
518/*---------------------------------------------------------------------------*/
519/*---------------------------------------------------------------------------*/
520
522replication() const
523{
524 return m_replication;
525}
526
527/*---------------------------------------------------------------------------*/
528/*---------------------------------------------------------------------------*/
529
532{
533 delete m_replication;
534 m_replication = v;
535}
536
537/*---------------------------------------------------------------------------*/
538/*---------------------------------------------------------------------------*/
539
542{
543 return m_sequential_parallel_mng.get();
544}
545
546/*---------------------------------------------------------------------------*/
547/*---------------------------------------------------------------------------*/
548
549Ref<IParallelMng> HybridParallelMng::
550sequentialParallelMngRef()
551{
552 return m_sequential_parallel_mng;
553}
554
555/*---------------------------------------------------------------------------*/
556/*---------------------------------------------------------------------------*/
562{
564 public:
566 : m_parallel_mng(pm), m_message_queue(pm->m_message_queue),
567 m_local_rank(m_parallel_mng->localRank()) {}
568 public:
570 {
571 switch(wait_type){
572 case Parallel::WaitAll:
573 m_parallel_mng->m_message_queue->waitAll(_requests());
574 break;
575 case Parallel::WaitSome:
576 m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),false);
577 break;
578 case Parallel::WaitSomeNonBlocking:
579 m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),true);
580 }
581 }
582 private:
583 HybridParallelMng* m_parallel_mng;
584 HybridMessageQueue* m_message_queue;
585 Int32 m_local_rank;
586};
587
588/*---------------------------------------------------------------------------*/
589/*---------------------------------------------------------------------------*/
590
597
598/*---------------------------------------------------------------------------*/
599/*---------------------------------------------------------------------------*/
600
603{
604 m_message_queue->waitAll(requests);
605}
606
607/*---------------------------------------------------------------------------*/
608/*---------------------------------------------------------------------------*/
609
612{
613 return m_mpi_parallel_mng->getMPICommunicator();
614}
615
616/*---------------------------------------------------------------------------*/
617/*---------------------------------------------------------------------------*/
618
620communicator() const
621{
622 return m_mpi_parallel_mng->communicator();
623}
624
625/*---------------------------------------------------------------------------*/
626/*---------------------------------------------------------------------------*/
627
630{
632 p2p_message.setEmiterRank(MessageRank(m_global_rank));
633 return p2p_message;
634}
635
636/*---------------------------------------------------------------------------*/
637/*---------------------------------------------------------------------------*/
638
644
645/*---------------------------------------------------------------------------*/
646/*---------------------------------------------------------------------------*/
647
648IParallelMng* HybridParallelMng::
649_createSubParallelMng(Int32ConstArrayView kept_ranks)
650{
651 ARCANE_UNUSED(kept_ranks);
652 ARCANE_THROW(NotSupportedException,"Use createSubParallelMngRef() instead");
653}
654
655/*---------------------------------------------------------------------------*/
656/*---------------------------------------------------------------------------*/
657
660{
661 // ATTENTION: Cette méthode est appelée simultanément par tous les threads
662 // partageant cet HybridParallelMng.
663
664 if (kept_ranks.empty())
665 ARCANE_FATAL("kept_ranks is empty");
666 ARCANE_CHECK_POINTER(m_sub_builder_factory);
667
668 m_trace->info() << "CREATE SUB_PARALLEL_MNG_REF";
669
670 /*
671 Il existe plusieurs possibilités:
672 1. on réduit juste le nombre de rangs en mémoire partagé pour chaque
673 processus MPI -< on créé un HybridParallelMng
674 2. On ne garde que le rang maitre de chaque processus MPI -> on créé un MpiParallelMng.
675 3. On ne garde que les rangs d'un même processus -> on créé un SharedMemoryParallelMng
676 4. On ne garde qu'un seul rang: on crée un MpiSequentialParallelMng.
677 */
678 // Pour l'instant, on ne supporte que le cas 1 et 2.
679 Int32 nb_kept_rank = kept_ranks.size();
680
681 // Détermine le nouveau nombre de rangs locaux par rang MPI.
682
683 // Regarde si je suis dans les listes des rangs conservés et si oui
684 // détermine mon rang dans le IParallelMng créé
687 // Mon nouveau rang local. Négatif si je ne suis pas dans le nouveau communicateur
688 Int32 my_new_global_rank = (-1);
689 Int32 new_local_nb_rank = 0;
690 Int32 my_new_local_rank = (-1);
691 for( Integer i=0; i<nb_kept_rank; ++i ){
692 Int32 kept_rank = kept_ranks[i];
698 }
699 }
700 bool has_new_rank = (my_new_global_rank != (-1));
701
702 // Calcule le min, le max et la somme sur tous les rangs du nombre de nouveaux.
703 // Deux cas peuvent se présenter:
704 // 1. Le min et le max sont égaux et supérieurs ou égaux à 2: Dans ce cas on créé
705 // un HybridParallelMng.
706 // 2. Le max vaut 1. Dans ce cas on créé un nouveau IParallelMng via le MpiParallelMng.
707 // Les rangs actuels pour lequels 'new_local_nb_rank' vaut 0 ne seront pas dans ce
708 // nouveau communicateur. Ce cas concerne aussi le cas où il ne reste plus qu'un
709 // seul rang à la fin.
710
711 Int32 min_new_local_nb_rank = -1;
712 Int32 max_new_local_nb_rank = -1;
713 Int32 sum_new_local_nb_rank = -1;
714 Int32 min_rank = A_NULL_RANK;
715 Int32 max_rank = A_NULL_RANK;
718
719 m_trace->info() << "CREATE SUB_PARALLEL_MNG_REF new_local_nb_rank=" << new_local_nb_rank
720 << " min=" << min_new_local_nb_rank
721 << " max=" << max_new_local_nb_rank
722 << " sum=" << sum_new_local_nb_rank
723 << " new_global_rank=" << my_new_global_rank;
724
725 // S'il ne reste qu'un seul rang local, alors on construit uniquement un MpiParallelMng.
726 // Seul le PE qui a un nouveau rang est concerné et fait cela
727 if (max_new_local_nb_rank==1){
728 Integer nb_mpi_rank = m_mpi_parallel_mng->commSize();
729 // Il faut calculer les nouveaux rangs MPI.
730 // Si 'min_new_local_nb_rank' vaut 1, alors c'est simple car cela signifie qu'on garde
731 // tous les rangs MPI actuels (on fait l'équivalent d'un MPI_Comm_dup). Sinon, on
732 // récupère pour chaque rang MPI s'il sera dans le nouveau communicateur et on construit
733 // la liste des rangs conservés en fonction de cela.
734 // NOTE: dans tous les cas il faut faire attention qu'un seul thread utilise le
735 // 'm_mpi_parallel_mng'.
738 bool do_mpi_call = false;
739 if (min_new_local_nb_rank==1){
740 if (has_new_rank){
741 do_mpi_call = true;
743 for( Int32 x=0; x<nb_mpi_rank; ++x )
744 kept_mpi_ranks[x] = x;
745 }
746 }
747 else{
748 // Si je ne suis pas dans le nouveau communicateur, c'est le rang local 0 qui
749 // faut le 'gather'.
751 if (has_new_rank || m_local_rank==0){
752 do_mpi_call = true;
753 Int16 v = (has_new_rank) ? 1 : 0;
754 m_mpi_parallel_mng->allGather(ArrayView<Int16>(1,&v),gathered_ranks);
755 }
756 for( Int32 x=0; x<nb_mpi_rank; ++x )
757 if (gathered_ranks[x]==1)
758 kept_mpi_ranks.add(x);
759 }
760 if (do_mpi_call)
761 return m_mpi_parallel_mng->createSubParallelMngRef(kept_mpi_ranks);
762 else
763 return Ref<IParallelMng>();
764 }
765
767 ARCANE_FATAL("Not same number of new local ranks on every MPI processus: current={0} max={1}",
769
771 ARCANE_FATAL("number of local ranks is too low current={0} minimum=2",new_local_nb_rank);
772
773 // Met une barrière locale pour être sur que tout le monde attend ici.
774 m_thread_barrier->wait();
775
776 // NOTE: Le builder contient les parties communes aux IParallelMng créés. Il faut
777 // donc que ces derniers gardent une référence dessus sinon il sera détruit à la fin
778 // de cette méthode.
780
781 // Le rang 0 créé le builder
782 if (m_local_rank==0){
783 // Suppose qu'on à le même nombre de rangs MPI qu'avant donc on utilise
784 // le communicateur MPI qu'on a déjà.
786 builder = m_sub_builder_factory->_createParallelMngBuilder(new_local_nb_rank,c);
787 // Positionne le builder pour tout le monde
788 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder = builder;
789 }
790 // Attend pour être sur que tous les threads voit le bon builder.
791 m_thread_barrier->wait();
792
793 builder = m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder;
794 ARCANE_CHECK_POINTER(builder.get());
795
797 if (my_new_local_rank>=0){
798 new_parallel_mng = builder->_createParallelMng(my_new_local_rank,traceMng());
799 }
800 m_thread_barrier->wait();
801
802 // Ici, tout le monde a créé son IParallelMng. On peut donc
803 // supprimer la référence au builder. Les IParallelMng créés gardent
804 // une référence au builder
805 if (m_local_rank==0){
806 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder.reset();
807 }
808 m_thread_barrier->wait();
809
810 return new_parallel_mng;
811}
812
813/*---------------------------------------------------------------------------*/
814/*---------------------------------------------------------------------------*/
815
818{
819 return m_utils_factory;
820}
821
822/*---------------------------------------------------------------------------*/
823/*---------------------------------------------------------------------------*/
824
825bool HybridParallelMng::
826_isAcceleratorAware() const
827{
828 return m_mpi_parallel_mng->_internalApi()->isAcceleratorAware();
829}
830
831/*---------------------------------------------------------------------------*/
832/*---------------------------------------------------------------------------*/
833
834} // End namespace Arcane::MessagePassing
835
836/*---------------------------------------------------------------------------*/
837/*---------------------------------------------------------------------------*/
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Liste des fonctions d'échange de message.
Tableau d'items de types quelconques.
Opérations pour accéder aux valeurs de variables d'un autre sous-domaine.
Interface d'une famille d'entités.
Echange d'informations entre processeurs.
virtual Ref< IParallelMngContainer > _createParallelMngBuilder(Int32 nb_local_rank, Parallel::Communicator communicator)=0
Créé un conteneur pour nb_local_rank rangs locaux et avec comme communicateur communicator.
virtual bool isAcceleratorAware() const =0
Indique si l'implémentation gère les accélérateurs.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual void computeMinMaxSum(char val, char &min_val, char &max_val, char &sum_val, Int32 &min_rank, Int32 &max_rank)=0
Calcule en une opération la somme, le min, le max d'une valeur.
Informations sur la réplication des sous-domaines en parallèle.
Informations sur la topologie d'allocation des coeurs de calcul.
Envoie de valeurs sur différents processeurs.
Interface d'un service de synchronisation de variable.
Groupe d'entités de maillage.
Definition ItemGroup.h:49
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Definition Lima.cc:120
Interface d'une file de messages avec les threads.
Implémentation de IRequestList pour HybridParallelMng.
void _wait(Parallel::eWaitType wait_type) override
Effectue l'attente ou le test.
Gestionnaire du parallélisme utilisant les threads.
IParallelMng * worldParallelMng() const override
Gestionnaire de parallélisme sur l'ensemble des ressources allouées.
void freeRequests(ArrayView< Request > requests) override
Libère les requêtes.
void printStats() override
Affiche des statistiques liées à ce gestionnaire du parallélisme.
IParallelExchanger * createExchanger() override
Retourne une interface pour transférer des messages entre processeurs.
void barrier() override
Effectue une barière.
MP::Communicator communicator() const override
Communicateur MPI associé à ce gestionnaire.
bool m_is_initialized
true si déjà initialisé
ISerializeMessage * createReceiveSerializer(Int32 rank) override
Créé un message non bloquant pour recevoir des données sérialisées du rang rank.
Ref< Parallel::IRequestList > createRequestListRef() override
Créé une liste de requêtes pour ce gestionnaire.
ITimerMng * timerMng() const override
Gestionnaire de timers.
void build() override
Construit l'instance.
void waitAllRequests(ArrayView< Request > requests) override
Bloque en attendant que les requêtes rvalues soient terminées.
void initialize() override
Initialise le gestionnaire du parallélisme.
IThreadMng * threadMng() const override
Gestionnaire de threads.
MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
Int32 m_local_rank
Rang local du processeur actuel.
ITraceMng * traceMng() const override
Gestionnaire de traces.
IGetVariablesValuesParallelOperation * createGetVariablesValuesOperation() override
Retourne une opération pour récupérer les valeurs d'une variable sur les entités d'un autre sous-doma...
void * getMPICommunicator() override
Adresse du communicateur MPI associé à ce gestionnaire.
IParallelReplication * replication() const override
Informations sur la réplication.
Ref< IParallelMngUtilsFactory > _internalUtilsFactory() const override
Fabrique des fonctions utilitaires.
Ref< IParallelMng > createSubParallelMngRef(Int32ConstArrayView kept_ranks) override
Créé un nouveau gestionnaire de parallélisme pour un sous-ensemble des rangs.
ITransferValuesParallelOperation * createTransferValuesOperation() override
Retourne une opération pour transférer des valeurs entre sous-domaine.
IParallelTopology * createTopology() override
Créé une instance contenant les infos sur la topologie des rangs de ce gestionnnaire.
void setReplication(IParallelReplication *v) override
Positionne les Informations sur la réplication.
IParallelMng * sequentialParallelMng() override
Retourne un gestionnaire de parallélisme séquentiel.
IVariableSynchronizer * createSynchronizer(IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
Int32 m_local_nb_rank
Nombre de rang locaux.
MessageId probe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
ISerializeMessage * createSendSerializer(Int32 rank) override
Créé un message non bloquant pour envoyer des données sérialisées au rang rank.
Int32 commRank() const override
Rang de cette instance dans le communicateur.
Int32 m_global_rank
Numéro du processeur actuel.
PointToPointMessageInfo buildMessage(Int32 dest, MP::eBlockingType is_blocking)
Construit un message avec pour destinataire dest.
void processPendingMessages() override
Envoie les messages de la liste qui ne l'ont pas encore été.
Integer waitMessages(Parallel::eWaitType wait_type) override
Attend que les messages aient terminé leur exécution.
void addMessage(ISerializeMessage *msg) override
Ajoute un message à la liste.
void barrier() override
Effectue une barière.
Int32 commRank() const override
Rang de cette instance dans le communicateur.
void * getMPICommunicator() override
Adresse du communicateur MPI associé à ce gestionnaire.
Communicator communicator() const override
Communicateur MPI associé à ce gestionnaire.
Int32 commSize() const override
Nombre d'instance dans le communicateur.
void allGather(ISerializer *send_serializer, ISerializer *recv_serializer) override
Redéfinit ici allGather pour éviter de cacher le symbole dans les classes dérivées.
Ref< IParallelMng > createSubParallelMngRef(Int32ConstArrayView kept_ranks) override
Créé un nouveau gestionnaire de parallélisme pour un sous-ensemble des rangs.
IParallelMngInternal * _internalApi() override
API interne à Arcane.
IMessagePassingMng * messagePassingMng() const override
Gestionnaire de message de Arccore associé
ITimeStats * timeStats() const override
Gestionnaire de statistiques associé (peut être nul)
virtual void print(ITraceMng *trace)=0
Imprime sur trace les statistiques.
Implémentation d'un tampon pour la sérialisation.
Message utilisant un SerializeBuffer.
Gestionnaire de timer.
Definition TimerMng.h:39
Positionne la phase de l'action en cours d'exécution.
Definition Timer.h:128
Exception lorsqu'un argument est invalide.
Vue constante d'un tableau de type T.
virtual bool wait()=0
Bloque et attend que tous les threads appellent cette méthode.
Interface du gestionnaire de traces.
virtual TraceMessage warning()=0
Flot pour un message d'avertissement.
virtual TraceMessage info()=0
Flot pour un message d'information.
Communicateur pour l'échange de message.
Interface du gestionnaire des échanges de messages.
Informations sur la source d'un message.
Informations pour envoyer/recevoir un message point à point.
Requête d'un message.
Definition Request.h:77
Classe de base d'une liste de requêtes.
Exception lorsqu'une fonction n'est pas implémentée.
Exception lorsqu'une opération n'est pas supportée.
TraceMessage info() const
Flot pour un message d'information.
Positionne une classe de message.
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
Definition Parallel.h:94
IStat * createDefaultStat()
Créé une instance par défaut.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
ArrayView< Int64 > Int64ArrayView
Equivalent C d'un tableau à une dimension d'entiers 64 bits.
Definition UtilsTypes.h:609
eBlockingType
Type indiquant si un message est bloquant ou non.
Ref< TrueType > createRef(Args &&... args)
Créé une instance de type TrueType avec les arguments Args et retourne une référence dessus.
Infos pour construire un SequentialParallelMng.