Arcane  v3.16.0.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
HybridParallelMng.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* HybridParallelMng.cc (C) 2000-2025 */
9/* */
10/* Gestionnaire de parallélisme utilisant un mixte MPI/Threads. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/parallel/mpithread/HybridParallelMng.h"
15
16#include "arcane/utils/NotImplementedException.h"
17#include "arcane/utils/NotSupportedException.h"
18#include "arcane/utils/FatalErrorException.h"
19#include "arcane/utils/NumericTypes.h"
20#include "arcane/utils/ArgumentException.h"
21#include "arcane/utils/IThreadBarrier.h"
22#include "arcane/utils/ITraceMng.h"
23
24#include "arcane/core/IIOMng.h"
25#include "arcane/core/Timer.h"
26#include "arcane/core/ISerializeMessageList.h"
27#include "arcane/core/IItemFamily.h"
28#include "arcane/core/internal/IParallelMngInternal.h"
29#include "arcane/core/internal/SerializeMessage.h"
30#include "arcane/core/parallel/IStat.h"
31
32#include "arcane/parallel/mpithread/HybridParallelDispatch.h"
33#include "arcane/parallel/mpithread/HybridMessageQueue.h"
34#include "arcane/parallel/mpi/MpiParallelMng.h"
35
36#include "arcane/impl/TimerMng.h"
37#include "arcane/impl/ParallelReplication.h"
38#include "arcane/impl/SequentialParallelMng.h"
39#include "arcane/impl/internal/ParallelMngUtilsFactoryBase.h"
40
42#include "arccore/message_passing/RequestListBase.h"
43#include "arccore/message_passing/SerializeMessageList.h"
44
45/*---------------------------------------------------------------------------*/
46/*---------------------------------------------------------------------------*/
47
48namespace Arcane
49{
50extern "C++" IIOMng*
51arcaneCreateIOMng(IParallelMng* psm);
52}
53
55{
56
57/*---------------------------------------------------------------------------*/
58/*---------------------------------------------------------------------------*/
59
60// NOTE: Cette classe n'est plus utilisée. Elle reste pour référence
61// et sera supprimée ultérieurement
62class HybridSerializeMessageList
64{
65 public:
66 class HybridSerializeMessageRequest
67 {
68 public:
69 HybridSerializeMessageRequest(ISerializeMessage* message,Request request)
70 : m_message(message), m_request(request){}
71 public:
72 ISerializeMessage* m_message = nullptr;
73 Request m_request;
74 };
75
76 public:
77
78 explicit HybridSerializeMessageList(HybridParallelMng* mpm)
79 : m_parallel_mng(mpm), m_trace(mpm->traceMng())
80 {
81 }
82
83 public:
84
85 void addMessage(ISerializeMessage* msg) override
86 {
87 m_messages_to_process.add(msg);
88
89 }
90 void processPendingMessages() override
91 {
92 }
93
95 {
96 switch(wait_type){
97 case Parallel::WaitAll:
98 // Pour l'instant seul le mode bloquant est supporté.
99 //m_parallel_mng->processMessages(m_messages_to_process);
100 _wait(Parallel::WaitAll);
101 m_messages_to_process.clear();
102 return (-1);
106 ARCANE_THROW(NotImplementedException,"WaitSomeNonBlocking");
107 }
108 return (-1);
109 }
110
111 private:
112
113 HybridParallelMng* m_parallel_mng;
114 ITraceMng* m_trace;
115 UniqueArray<ISerializeMessage*> m_messages_to_process;
116
117 void _waitAll();
118 void _wait(Parallel::eWaitType wait_mode);
119};
120
121/*---------------------------------------------------------------------------*/
122/*---------------------------------------------------------------------------*/
123
124void HybridSerializeMessageList::
125_wait(Parallel::eWaitType wait_mode)
126{
127 m_trace->info() << "BEGIN PROCESS MESSAGES";
128
129 // TODO: gérer la memoire sans faire de new.
130 ConstArrayView<ISerializeMessage*> messages = m_messages_to_process;
131 HybridMessageQueue* message_queue = m_parallel_mng->m_message_queue;
132 UniqueArray<Request> all_requests;
133 MessageTag HYBRID_MESSAGE_TAG(511);
134 for( ISerializeMessage* sm : messages ){
135 ISerializer* s = sm->serializer();
136 MessageRank orig(sm->source());
137 MessageRank dest(sm->destination());
138 PointToPointMessageInfo message_info(orig,dest,HYBRID_MESSAGE_TAG,Parallel::NonBlocking);
139 Request r;
140 if (sm->isSend())
141 r = message_queue->addSend(message_info,SendBufferInfo(s));
142 else
143 r = message_queue->addReceive(message_info,ReceiveBufferInfo(s));
144 all_requests.add(r);
145 }
146
147 if (wait_mode==Parallel::WaitAll)
148 message_queue->waitAll(all_requests);
149
150 for( ISerializeMessage* sm : messages )
151 sm->setFinished(true);
152}
153
154/*---------------------------------------------------------------------------*/
155/*---------------------------------------------------------------------------*/
156
157/*---------------------------------------------------------------------------*/
158/*---------------------------------------------------------------------------*/
159
160HybridParallelMng::
161HybridParallelMng(const HybridParallelMngBuildInfo& bi)
162: ParallelMngDispatcher(ParallelMngDispatcherBuildInfo(bi.local_rank,bi.local_nb_rank))
163, m_trace(bi.trace_mng)
164, m_thread_mng(bi.thread_mng)
165, m_world_parallel_mng(bi.world_parallel_mng)
166, m_io_mng(nullptr)
167, m_timer_mng(nullptr)
168, m_replication(new ParallelReplication())
169, m_message_queue(new HybridMessageQueue(bi.message_queue,bi.mpi_parallel_mng,bi.local_nb_rank))
170, m_is_initialized(false)
171, m_stat(Parallel::createDefaultStat())
172, m_thread_barrier(bi.thread_barrier)
173, m_mpi_parallel_mng(bi.mpi_parallel_mng)
174, m_all_dispatchers(bi.all_dispatchers)
175, m_sub_builder_factory(bi.sub_builder_factory)
176, m_parent_container_ref(bi.container)
177, m_utils_factory(createRef<ParallelMngUtilsFactoryBase>())
178{
179 if (!m_world_parallel_mng)
180 m_world_parallel_mng = this;
181
182 // TODO: vérifier que tous les autres HybridParallelMng ont bien
183 // le même nombre de rang locaux (m_local_nb_rank)
184 m_local_rank = bi.local_rank;
185 m_local_nb_rank = bi.local_nb_rank;
186
187 Int32 mpi_rank = m_mpi_parallel_mng->commRank();
188 Int32 mpi_size = m_mpi_parallel_mng->commSize();
189
190 m_global_rank = m_local_rank + mpi_rank * m_local_nb_rank;
191 m_global_nb_rank = mpi_size * m_local_nb_rank;
192
193 m_is_parallel = m_global_nb_rank!=1;
194}
195
196/*---------------------------------------------------------------------------*/
197/*---------------------------------------------------------------------------*/
198
199HybridParallelMng::
200~HybridParallelMng()
201{
202 m_sequential_parallel_mng.reset();
203 delete m_replication;
204 delete m_io_mng;
205 delete m_message_queue;
206 delete m_timer_mng;
207 delete m_stat;
208 delete m_mpi_parallel_mng;
209}
210
211/*---------------------------------------------------------------------------*/
212/*---------------------------------------------------------------------------*/
213
214namespace
215{
216// Classe pour créer les différents dispatchers
217class DispatchCreator
218{
219 public:
220 DispatchCreator(ITraceMng* tm,HybridParallelMng* mpm,HybridMessageQueue* message_queue,MpiThreadAllDispatcher* all_dispatchers)
221 : m_tm(tm), m_mpm(mpm), m_message_queue(message_queue), m_all_dispatchers(all_dispatchers){}
222 public:
223 template<typename DataType> HybridParallelDispatch<DataType>*
224 create()
225 {
226 HybridMessageQueue* tmq = m_message_queue;
227 MpiThreadAllDispatcher* ad = m_all_dispatchers;
228 auto field = ad->instance((DataType*)nullptr).view();
229 return new HybridParallelDispatch<DataType>(m_tm,m_mpm,tmq,field);
230 }
231
232 ITraceMng* m_tm;
233 HybridParallelMng* m_mpm;
234 HybridMessageQueue* m_message_queue;
235 MpiThreadAllDispatcher* m_all_dispatchers;
236};
237}
238
239/*---------------------------------------------------------------------------*/
240/*---------------------------------------------------------------------------*/
241
243build()
244{
245 ITraceMng* tm = traceMng();
246 tm->info() << "Initialise HybridParallelMng"
247 << " global_rank=" << m_global_rank
248 << " local_rank=" << m_local_rank
249 << " mpi_rank=" << m_mpi_parallel_mng->commRank();
250
251 m_timer_mng = new TimerMng(tm);
252
253 // Créé le gestionnaire séquentiel associé.
254 {
256 bi.setTraceMng(traceMng());
257 bi.setCommunicator(communicator());
258 bi.setThreadMng(threadMng());
259 m_sequential_parallel_mng = arcaneCreateSequentialParallelMngRef(bi);
260 }
261
262 DispatchCreator creator(m_trace,this,m_message_queue,m_all_dispatchers);
263 this->createDispatchers(creator);
264 m_io_mng = arcaneCreateIOMng(this);
265}
266
267/*---------------------------------------------------------------------------*/
268/*---------------------------------------------------------------------------*/
269
270/*----------------------------------------------------------------------------*/
271/*---------------------------------------------------------------------------*/
272
275{
276 Trace::Setter mci(m_trace,"Thread");
277 if (m_is_initialized){
278 m_trace->warning() << "HybridParallelMng already initialized";
279 return;
280 }
281
282 m_is_initialized = true;
283}
284
285/*---------------------------------------------------------------------------*/
286/*---------------------------------------------------------------------------*/
287
288SerializeBuffer* HybridParallelMng::
289_castSerializer(ISerializer* serializer)
290{
291 auto sbuf = dynamic_cast<SerializeBuffer*>(serializer);
292 if (!sbuf)
293 ARCANE_THROW(ArgumentException,"can not cast 'ISerializer' to 'SerializeBuffer'");
294 return sbuf;
295}
296
297/*---------------------------------------------------------------------------*/
298/*---------------------------------------------------------------------------*/
299
302{
303 return m_utils_factory->createGetVariablesValuesOperation(this)._release();
304}
305
308{
309 return m_utils_factory->createTransferValuesOperation(this)._release();
310}
311
314{
315 return m_utils_factory->createExchanger(this)._release();
316}
317
318/*---------------------------------------------------------------------------*/
319/*---------------------------------------------------------------------------*/
320
321/*---------------------------------------------------------------------------*/
322/*---------------------------------------------------------------------------*/
323
324void HybridParallelMng::
325sendSerializer(ISerializer* s,Int32 rank)
326{
327 auto p2p_message = buildMessage(rank,Parallel::NonBlocking);
328 Request r = m_message_queue->addSend(p2p_message,s);
329 m_message_queue->waitAll(ArrayView<Request>(1,&r));
330}
331
332/*---------------------------------------------------------------------------*/
333/*---------------------------------------------------------------------------*/
334
335auto HybridParallelMng::
336sendSerializer(ISerializer* s,Int32 rank,ByteArray& bytes) -> Request
337{
338 ARCANE_UNUSED(bytes);
339 auto p2p_message = buildMessage(rank,Parallel::NonBlocking);
340 return m_message_queue->addSend(p2p_message,s);
341}
342
343/*---------------------------------------------------------------------------*/
344/*---------------------------------------------------------------------------*/
345
348{
349 return m_utils_factory->createSendSerializeMessage(this, rank)._release();
350}
351
352/*---------------------------------------------------------------------------*/
353/*---------------------------------------------------------------------------*/
354
355void HybridParallelMng::
356broadcastSerializer(ISerializer* values,Int32 rank)
357{
358 Timer::Phase tphase(timeStats(),TP_Communication);
359 SerializeBuffer* sbuf = _castSerializer(values);
360
361 bool is_broadcaster = (rank==commRank());
362
363 // Effectue l'envoie en deux phases. Envoie d'abord le nombre d'éléments
364 // puis envoie les éléments.
365 // TODO: il serait possible de le faire en une fois pour les messages
366 // ne dépassant pas une certaine taille.
367
369 if (is_broadcaster){
370 Int64 total_size = sbuf->totalSize();
371 Span<Byte> bytes = sbuf->globalBuffer();
372 this->broadcast(Int64ArrayView(1,&total_size),rank);
373 mpBroadcast(mpm,bytes,rank);
374 }
375 else{
376 Int64 total_size = 0;
377 this->broadcast(Int64ArrayView(1,&total_size),rank);
378 sbuf->preallocate(total_size);
379 Span<Byte> bytes = sbuf->globalBuffer();
380 mpBroadcast(mpm,bytes,rank);
381 sbuf->setFromSizes();
382 }
383}
384
385/*---------------------------------------------------------------------------*/
386/*---------------------------------------------------------------------------*/
387
388void HybridParallelMng::
389recvSerializer(ISerializer* s,Int32 rank)
390{
391 auto p2p_message = buildMessage(rank,Parallel::NonBlocking);
392 Request r = m_message_queue->addReceive(p2p_message,ReceiveBufferInfo(s));
393 m_message_queue->waitAll(ArrayView<Request>(1,&r));
394}
395
396/*---------------------------------------------------------------------------*/
397/*---------------------------------------------------------------------------*/
398
401{
402 return m_utils_factory->createReceiveSerializeMessage(this, rank)._release();
403}
404
405/*---------------------------------------------------------------------------*/
406/*---------------------------------------------------------------------------*/
407
410{
411 ARCANE_UNUSED(requests);
412 throw NotImplementedException(A_FUNCINFO);
413}
414
415/*---------------------------------------------------------------------------*/
416/*---------------------------------------------------------------------------*/
417
419probe(const PointToPointMessageInfo& message)
420{
421 PointToPointMessageInfo p2p_message(message);
423 return m_message_queue->probe(p2p_message);
424}
425
426/*---------------------------------------------------------------------------*/
427/*---------------------------------------------------------------------------*/
428
431{
432 PointToPointMessageInfo p2p_message(message);
434 return m_message_queue->legacyProbe(p2p_message);
435}
436
437/*---------------------------------------------------------------------------*/
438/*---------------------------------------------------------------------------*/
439
440Request HybridParallelMng::
441sendSerializer(const ISerializer* s,const PointToPointMessageInfo& message)
442{
443 auto p2p_message = buildMessage(message);
444 return m_message_queue->addSend(p2p_message,s);
445}
446
447/*---------------------------------------------------------------------------*/
448/*---------------------------------------------------------------------------*/
449
450Request HybridParallelMng::
451receiveSerializer(ISerializer* s,const PointToPointMessageInfo& message)
452{
453 auto p2p_message = buildMessage(message);
454 return m_message_queue->addReceive(p2p_message,ReceiveBufferInfo(s));
455}
456
457/*---------------------------------------------------------------------------*/
458/*---------------------------------------------------------------------------*/
459
462{
463 if (m_stat)
464 m_stat->print(m_trace);
465}
466
467/*---------------------------------------------------------------------------*/
468/*---------------------------------------------------------------------------*/
469
471barrier()
472{
473 m_thread_barrier->wait();
474 if (m_local_rank==0)
475 m_mpi_parallel_mng->barrier();
476 m_thread_barrier->wait();
477}
478
479/*---------------------------------------------------------------------------*/
480/*---------------------------------------------------------------------------*/
481
482ISerializeMessageList* HybridParallelMng::
483_createSerializeMessageList()
484{
485 auto* x = new MP::internal::SerializeMessageList(messagePassingMng());
486 x->setAllowAnyRankReceive(false);
487 return x;
488}
489
490/*---------------------------------------------------------------------------*/
491/*---------------------------------------------------------------------------*/
492
495{
496 return m_utils_factory->createSynchronizer(this,family)._release();
497}
498
499/*---------------------------------------------------------------------------*/
500/*---------------------------------------------------------------------------*/
501
503createSynchronizer(const ItemGroup& group)
504{
505 return m_utils_factory->createSynchronizer(this,group)._release();
506}
507
508/*---------------------------------------------------------------------------*/
509/*---------------------------------------------------------------------------*/
510
513{
514 return m_utils_factory->createTopology(this)._release();
515}
516
517/*---------------------------------------------------------------------------*/
518/*---------------------------------------------------------------------------*/
519
521replication() const
522{
523 return m_replication;
524}
525
526/*---------------------------------------------------------------------------*/
527/*---------------------------------------------------------------------------*/
528
531{
532 delete m_replication;
533 m_replication = v;
534}
535
536/*---------------------------------------------------------------------------*/
537/*---------------------------------------------------------------------------*/
538
541{
542 return m_sequential_parallel_mng.get();
543}
544
545/*---------------------------------------------------------------------------*/
546/*---------------------------------------------------------------------------*/
547
548Ref<IParallelMng> HybridParallelMng::
549sequentialParallelMngRef()
550{
551 return m_sequential_parallel_mng;
552}
553
554/*---------------------------------------------------------------------------*/
555/*---------------------------------------------------------------------------*/
561{
563 public:
564 RequestList(HybridParallelMng* pm)
565 : m_parallel_mng(pm), m_message_queue(pm->m_message_queue),
566 m_local_rank(m_parallel_mng->localRank()) {}
567 public:
568 void _wait(Parallel::eWaitType wait_type) override
569 {
570 switch(wait_type){
571 case Parallel::WaitAll:
572 m_parallel_mng->m_message_queue->waitAll(_requests());
573 break;
575 m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),false);
576 break;
578 m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),true);
579 }
580 }
581 private:
582 HybridParallelMng* m_parallel_mng;
583 HybridMessageQueue* m_message_queue;
585};
586
587/*---------------------------------------------------------------------------*/
588/*---------------------------------------------------------------------------*/
589
596
597/*---------------------------------------------------------------------------*/
598/*---------------------------------------------------------------------------*/
599
602{
603 m_message_queue->waitAll(requests);
604}
605
606/*---------------------------------------------------------------------------*/
607/*---------------------------------------------------------------------------*/
608
611{
612 return m_mpi_parallel_mng->getMPICommunicator();
613}
614
615/*---------------------------------------------------------------------------*/
616/*---------------------------------------------------------------------------*/
617
618MP::Communicator HybridParallelMng::
619communicator() const
620{
621 return m_mpi_parallel_mng->communicator();
622}
623
624/*---------------------------------------------------------------------------*/
625/*---------------------------------------------------------------------------*/
626
629{
630 PointToPointMessageInfo p2p_message(message);
631 p2p_message.setEmiterRank(MessageRank(m_global_rank));
632 return p2p_message;
633}
634
635/*---------------------------------------------------------------------------*/
636/*---------------------------------------------------------------------------*/
637
640{
641 return buildMessage({MessageRank(dest),blocking_mode});
642}
643
644/*---------------------------------------------------------------------------*/
645/*---------------------------------------------------------------------------*/
646
647IParallelMng* HybridParallelMng::
648_createSubParallelMng(Int32ConstArrayView kept_ranks)
649{
650 ARCANE_UNUSED(kept_ranks);
651 ARCANE_THROW(NotSupportedException,"Use createSubParallelMngRef() instead");
652}
653
654/*---------------------------------------------------------------------------*/
655/*---------------------------------------------------------------------------*/
656
659{
660 // ATTENTION: Cette méthode est appelée simultanément par tous les threads
661 // partageant cet HybridParallelMng.
662
663 if (kept_ranks.empty())
664 ARCANE_FATAL("kept_ranks is empty");
665 ARCANE_CHECK_POINTER(m_sub_builder_factory);
666
667 m_trace->info() << "CREATE SUB_PARALLEL_MNG_REF";
668
669 /*
670 Il existe plusieurs possibilités:
671 1. on réduit juste le nombre de rangs en mémoire partagé pour chaque
672 processus MPI -< on créé un HybridParallelMng
673 2. On ne garde que le rang maitre de chaque processus MPI -> on créé un MpiParallelMng.
674 3. On ne garde que les rangs d'un même processus -> on créé un SharedMemoryParallelMng
675 4. On ne garde qu'un seul rang: on crée un MpiSequentialParallelMng.
676 */
677 // Pour l'instant, on ne supporte que le cas 1 et 2.
678 Int32 nb_kept_rank = kept_ranks.size();
679
680 // Détermine le nouveau nombre de rangs locaux par rang MPI.
681
682 // Regarde si je suis dans les listes des rangs conservés et si oui
683 // détermine mon rang dans le IParallelMng créé
684 Int32 first_global_rank_in_this_mpi = m_global_rank - m_local_rank;
685 Int32 last_global_rank_in_this_mpi = first_global_rank_in_this_mpi + m_local_nb_rank - 1;
686 // Mon nouveau rang local. Négatif si je ne suis pas dans le nouveau communicateur
687 Int32 my_new_global_rank = (-1);
688 Int32 new_local_nb_rank = 0;
689 Int32 my_new_local_rank = (-1);
690 for( Integer i=0; i<nb_kept_rank; ++i ){
691 Int32 kept_rank = kept_ranks[i];
692 if (kept_rank>=first_global_rank_in_this_mpi && kept_rank<last_global_rank_in_this_mpi)
693 ++new_local_nb_rank;
694 if (kept_rank==m_global_rank){
695 my_new_global_rank = i;
696 my_new_local_rank = new_local_nb_rank - 1;
697 }
698 }
699 bool has_new_rank = (my_new_global_rank != (-1));
700
701 // Calcule le min, le max et la somme sur tous les rangs du nombre de nouveaux.
702 // Deux cas peuvent se présenter:
703 // 1. Le min et le max sont égaux et supérieurs ou égaux à 2: Dans ce cas on créé
704 // un HybridParallelMng.
705 // 2. Le max vaut 1. Dans ce cas on créé un nouveau IParallelMng via le MpiParallelMng.
706 // Les rangs actuels pour lequels 'new_local_nb_rank' vaut 0 ne seront pas dans ce
707 // nouveau communicateur. Ce cas concerne aussi le cas où il ne reste plus qu'un
708 // seul rang à la fin.
709
710 Int32 min_new_local_nb_rank = -1;
711 Int32 max_new_local_nb_rank = -1;
712 Int32 sum_new_local_nb_rank = -1;
713 Int32 min_rank = A_NULL_RANK;
714 Int32 max_rank = A_NULL_RANK;
715 computeMinMaxSum(new_local_nb_rank,min_new_local_nb_rank,max_new_local_nb_rank,
716 sum_new_local_nb_rank,min_rank,max_rank);
717
718 m_trace->info() << "CREATE SUB_PARALLEL_MNG_REF new_local_nb_rank=" << new_local_nb_rank
719 << " min=" << min_new_local_nb_rank
720 << " max=" << max_new_local_nb_rank
721 << " sum=" << sum_new_local_nb_rank
722 << " new_global_rank=" << my_new_global_rank;
723
724 // S'il ne reste qu'un seul rang local, alors on construit uniquement un MpiParallelMng.
725 // Seul le PE qui a un nouveau rang est concerné et fait cela
726 if (max_new_local_nb_rank==1){
727 Integer nb_mpi_rank = m_mpi_parallel_mng->commSize();
728 // Il faut calculer les nouveaux rangs MPI.
729 // Si 'min_new_local_nb_rank' vaut 1, alors c'est simple car cela signifie qu'on garde
730 // tous les rangs MPI actuels (on fait l'équivalent d'un MPI_Comm_dup). Sinon, on
731 // récupère pour chaque rang MPI s'il sera dans le nouveau communicateur et on construit
732 // la liste des rangs conservés en fonction de cela.
733 // NOTE: dans tous les cas il faut faire attention qu'un seul thread utilise le
734 // 'm_mpi_parallel_mng'.
735 UniqueArray<Int32> kept_mpi_ranks;
737 bool do_mpi_call = false;
738 if (min_new_local_nb_rank==1){
739 if (has_new_rank){
740 do_mpi_call = true;
741 kept_mpi_ranks.resize(nb_mpi_rank);
742 for( Int32 x=0; x<nb_mpi_rank; ++x )
743 kept_mpi_ranks[x] = x;
744 }
745 }
746 else{
747 // Si je ne suis pas dans le nouveau communicateur, c'est le rang local 0 qui
748 // faut le 'gather'.
749 UniqueArray<Int16> gathered_ranks(nb_mpi_rank);
750 if (has_new_rank || m_local_rank==0){
751 do_mpi_call = true;
752 Int16 v = (has_new_rank) ? 1 : 0;
753 m_mpi_parallel_mng->allGather(ArrayView<Int16>(1,&v),gathered_ranks);
754 }
755 for( Int32 x=0; x<nb_mpi_rank; ++x )
756 if (gathered_ranks[x]==1)
757 kept_mpi_ranks.add(x);
758 }
759 if (do_mpi_call)
760 return m_mpi_parallel_mng->createSubParallelMngRef(kept_mpi_ranks);
761 else
762 return Ref<IParallelMng>();
763 }
764
765 if (max_new_local_nb_rank!=new_local_nb_rank)
766 ARCANE_FATAL("Not same number of new local ranks on every MPI processus: current={0} max={1}",
767 new_local_nb_rank,max_new_local_nb_rank);
768
769 if (max_new_local_nb_rank<2)
770 ARCANE_FATAL("number of local ranks is too low current={0} minimum=2",new_local_nb_rank);
771
772 // Met une barrière locale pour être sur que tout le monde attend ici.
773 m_thread_barrier->wait();
774
775 // NOTE: Le builder contient les parties communes aux IParallelMng créés. Il faut
776 // donc que ces derniers gardent une référence dessus sinon il sera détruit à la fin
777 // de cette méthode.
779
780 // Le rang 0 créé le builder
781 if (m_local_rank==0){
782 // Suppose qu'on à le même nombre de rangs MPI qu'avant donc on utilise
783 // le communicateur MPI qu'on a déjà.
784 MP::Communicator c = communicator();
785 builder = m_sub_builder_factory->_createParallelMngBuilder(new_local_nb_rank,c);
786 // Positionne le builder pour tout le monde
787 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder = builder;
788 }
789 // Attend pour être sur que tous les threads voit le bon builder.
790 m_thread_barrier->wait();
791
792 builder = m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder;
793 ARCANE_CHECK_POINTER(builder.get());
794
795 Ref<IParallelMng> new_parallel_mng;
796 if (my_new_local_rank>=0){
797 new_parallel_mng = builder->_createParallelMng(my_new_local_rank,traceMng());
798 }
799 m_thread_barrier->wait();
800
801 // Ici, tout le monde a créé son IParallelMng. On peut donc
802 // supprimer la référence au builder. Les IParallelMng créés gardent
803 // une référence au builder
804 if (m_local_rank==0){
805 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder.reset();
806 }
807 m_thread_barrier->wait();
808
809 return new_parallel_mng;
810}
811
812/*---------------------------------------------------------------------------*/
813/*---------------------------------------------------------------------------*/
814
817{
818 return m_utils_factory;
819}
820
821/*---------------------------------------------------------------------------*/
822/*---------------------------------------------------------------------------*/
823
824bool HybridParallelMng::
825_isAcceleratorAware() const
826{
827 return m_mpi_parallel_mng->_internalApi()->isAcceleratorAware();
828}
829
830/*---------------------------------------------------------------------------*/
831/*---------------------------------------------------------------------------*/
832
833} // End namespace Arcane::MessagePassing
834
835/*---------------------------------------------------------------------------*/
836/*---------------------------------------------------------------------------*/
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Liste des fonctions d'échange de message.
Exception lorsqu'un argument est invalide.
Vue modifiable d'un tableau d'un type T.
void resize(Int64 s)
Change le nombre d'éléments du tableau à s.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
Vue constante d'un tableau de type T.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
constexpr bool empty() const noexcept
true si le tableau est vide (size()==0)
Opérations pour accéder aux valeurs de variables d'un autre sous-domaine.
Interface du gestionnaire des entrées sorties.
Definition IIOMng.h:42
Interface d'une famille d'entités.
Definition IItemFamily.h:84
Echange d'informations entre processeurs.
virtual bool isAcceleratorAware() const =0
Indique si l'implémentation gère les accélérateurs.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual void computeMinMaxSum(char val, char &min_val, char &max_val, char &sum_val, Int32 &min_rank, Int32 &max_rank)=0
Calcule en une opération la somme, le min, le max d'une valeur.
Informations sur la réplication des sous-domaines en parallèle.
Informations sur la topologie d'allocation des coeurs de calcul.
Interface du gestionnaire de traces.
virtual TraceMessage info()=0
Flot pour un message d'information.
Envoie de valeurs sur différents processeurs.
Interface d'un service de synchronisation de variable.
Groupe d'entités de maillage.
Definition ItemGroup.h:49
Interface d'une file de messages avec les threads.
Implémentation de IRequestList pour HybridParallelMng.
void _wait(Parallel::eWaitType wait_type) override
Effectue l'attente ou le test.
Gestionnaire du parallélisme utilisant les threads.
IParallelMng * worldParallelMng() const override
Gestionnaire de parallélisme sur l'ensemble des ressources allouées.
void freeRequests(ArrayView< Request > requests) override
Libère les requêtes.
void printStats() override
Affiche des statistiques liées à ce gestionnaire du parallélisme.
IParallelExchanger * createExchanger() override
Retourne une interface pour transférer des messages entre processeurs.
void barrier() override
Effectue une barière.
MP::Communicator communicator() const override
Communicateur MPI associé à ce gestionnaire.
bool m_is_initialized
true si déjà initialisé
ISerializeMessage * createReceiveSerializer(Int32 rank) override
Créé un message non bloquant pour recevoir des données sérialisées du rang rank.
Ref< Parallel::IRequestList > createRequestListRef() override
Créé une liste de requêtes pour ce gestionnaire.
ITimerMng * timerMng() const override
Gestionnaire de timers.
void build() override
Construit l'instance.
void waitAllRequests(ArrayView< Request > requests) override
Bloque en attendant que les requêtes rvalues soient terminées.
void initialize() override
Initialise le gestionnaire du parallélisme.
IThreadMng * threadMng() const override
Gestionnaire de threads.
MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
Int32 m_local_rank
Rang local du processeur actuel.
ITraceMng * traceMng() const override
Gestionnaire de traces.
IGetVariablesValuesParallelOperation * createGetVariablesValuesOperation() override
Retourne une opération pour récupérer les valeurs d'une variable sur les entités d'un autre sous-doma...
void * getMPICommunicator() override
Adresse du communicateur MPI associé à ce gestionnaire.
IParallelReplication * replication() const override
Informations sur la réplication.
Ref< IParallelMngUtilsFactory > _internalUtilsFactory() const override
Fabrique des fonctions utilitaires.
Ref< IParallelMng > createSubParallelMngRef(Int32ConstArrayView kept_ranks) override
Créé un nouveau gestionnaire de parallélisme pour un sous-ensemble des rangs.
ITransferValuesParallelOperation * createTransferValuesOperation() override
Retourne une opération pour transférer des valeurs entre sous-domaine.
IParallelTopology * createTopology() override
Créé une instance contenant les infos sur la topologie des rangs de ce gestionnnaire.
void setReplication(IParallelReplication *v) override
Positionne les Informations sur la réplication.
IParallelMng * sequentialParallelMng() override
Retourne un gestionnaire de parallélisme séquentiel.
IVariableSynchronizer * createSynchronizer(IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
Int32 m_local_nb_rank
Nombre de rang locaux.
MessageId probe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
ISerializeMessage * createSendSerializer(Int32 rank) override
Créé un message non bloquant pour envoyer des données sérialisées au rang rank.
Int32 commRank() const override
Rang de cette instance dans le communicateur.
Int32 m_global_rank
Numéro du processeur actuel.
PointToPointMessageInfo buildMessage(Int32 dest, MP::eBlockingType is_blocking)
Construit un message avec pour destinataire dest.
void processPendingMessages() override
Envoie les messages de la liste qui ne l'ont pas encore été.
Integer waitMessages(Parallel::eWaitType wait_type) override
Attend que les messages aient terminé leur exécution.
void addMessage(ISerializeMessage *msg) override
Ajoute un message à la liste.
Interface du gestionnaire des échanges de messages.
Interface d'un message de sérialisation entre IMessagePassingMng.
Informations sur la source d'un message.
Informations pour envoyer/recevoir un message point à point.
void setEmiterRank(MessageRank rank)
Positionne le rang de l'émetteur du message.
Informations des buffers de réception.
Requête d'un message.
Definition Request.h:77
Classe de base d'une liste de requêtes.
Exception lorsqu'une fonction n'est pas implémentée.
IParallelMngInternal * _internalApi() override
API interne à Arcane.
IMessagePassingMng * messagePassingMng() const override
Gestionnaire de message de Arccore associé
ITimeStats * timeStats() const override
Gestionnaire de statistiques associé (peut être nul)
Référence à une instance.
InstanceType * get() const
Instance associée ou nullptr si aucune.
Implémentation d'un tampon pour la sérialisation.
Vue d'un tableau d'éléments de type T.
Definition Span.h:513
Gestionnaire de timer.
Definition TimerMng.h:39
Positionne la phase de l'action en cours d'exécution.
Definition Timer.h:128
Positionne une classe de message.
Vecteur 1D de données avec sémantique par valeur (style STL).
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
Definition Parallel.h:50
C void mpBroadcast(IMessagePassingMng *pm, Span< char > send_buf, Int32 rank)
@ WaitSome
Attend que tous les messages de la liste soient traités.
eBlockingType
Type indiquant si un message est bloquant ou non.
IStat * createDefaultStat()
Créé une instance par défaut.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
ArrayView< Int64 > Int64ArrayView
Equivalent C d'un tableau à une dimension d'entiers 64 bits.
Definition UtilsTypes.h:538
Ref< TrueType > createRef(Args &&... args)
Créé une instance de type TrueType avec les arguments Args et retourne une référence dessus.
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
Array< Byte > ByteArray
Tableau dynamique à une dimension de caractères.
Definition UtilsTypes.h:208
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:569
std::int16_t Int16
Type entier signé sur 16 bits.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int32_t Int32
Type entier signé sur 32 bits.
Infos pour construire un HybridParallelMng.
Infos pour construire un SequentialParallelMng.