Arcane  v4.1.7.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
HybridParallelMng.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2026 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* HybridParallelMng.cc (C) 2000-2026 */
9/* */
10/* Gestionnaire de parallélisme utilisant un mixte MPI/Threads. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/parallel/mpithread/HybridParallelMng.h"
15
16#include "arcane/utils/NotImplementedException.h"
17#include "arcane/utils/NotSupportedException.h"
18#include "arcane/utils/FatalErrorException.h"
19#include "arcane/utils/NumericTypes.h"
20#include "arcane/utils/ArgumentException.h"
21#include "arcane/utils/IThreadBarrier.h"
22#include "arcane/utils/ITraceMng.h"
23
24#include "arcane/core/IIOMng.h"
25#include "arcane/core/Timer.h"
26#include "arcane/core/ISerializeMessageList.h"
27#include "arcane/core/IItemFamily.h"
28#include "arcane/core/IParallelTopology.h"
29#include "arcane/core/internal/ParallelMngInternal.h"
30#include "arcane/core/internal/SerializeMessage.h"
31#include "arcane/core/internal/MachineShMemWinMemoryAllocator.h"
32#include "arcane/core/parallel/IStat.h"
33
34#include "arcane/parallel/mpithread/HybridParallelDispatch.h"
35#include "arcane/parallel/mpithread/HybridMessageQueue.h"
36#include "arcane/parallel/mpithread/internal/HybridContigMachineShMemWinBaseInternalCreator.h"
37#include "arcane/parallel/mpithread/internal/HybridContigMachineShMemWinBaseInternal.h"
38#include "arcane/parallel/mpithread/internal/HybridMachineShMemWinBaseInternal.h"
39
40#include "arcane/parallel/mpi/MpiParallelMng.h"
41
42#include "arcane/impl/TimerMng.h"
43#include "arcane/impl/ParallelReplication.h"
44#include "arcane/impl/SequentialParallelMng.h"
45#include "arcane/impl/internal/ParallelMngUtilsFactoryBase.h"
46
48#include "arccore/message_passing/RequestListBase.h"
49#include "arccore/message_passing/internal/SerializeMessageList.h"
50
51/*---------------------------------------------------------------------------*/
52/*---------------------------------------------------------------------------*/
53
54namespace Arcane
55{
56extern "C++" IIOMng*
57arcaneCreateIOMng(IParallelMng* psm);
58}
59
61{
62
63/*---------------------------------------------------------------------------*/
64/*---------------------------------------------------------------------------*/
65
66// NOTE: Cette classe n'est plus utilisée. Elle reste pour référence
67// et sera supprimée ultérieurement
68class HybridSerializeMessageList
70{
71 public:
72 class HybridSerializeMessageRequest
73 {
74 public:
75 HybridSerializeMessageRequest(ISerializeMessage* message,Request request)
76 : m_message(message), m_request(request){}
77 public:
78 ISerializeMessage* m_message = nullptr;
79 Request m_request;
80 };
81
82 public:
83
84 explicit HybridSerializeMessageList(HybridParallelMng* mpm)
85 : m_parallel_mng(mpm), m_trace(mpm->traceMng())
86 {
87 }
88
89 public:
90
91 void addMessage(ISerializeMessage* msg) override
92 {
93 m_messages_to_process.add(msg);
94
95 }
96 void processPendingMessages() override
97 {
98 }
99
101 {
102 switch(wait_type){
103 case Parallel::WaitAll:
104 // Pour l'instant seul le mode bloquant est supporté.
105 //m_parallel_mng->processMessages(m_messages_to_process);
106 _wait(Parallel::WaitAll);
107 m_messages_to_process.clear();
108 return (-1);
112 ARCANE_THROW(NotImplementedException,"WaitSomeNonBlocking");
113 }
114 return (-1);
115 }
116
117 private:
118
119 HybridParallelMng* m_parallel_mng;
120 ITraceMng* m_trace;
121 UniqueArray<ISerializeMessage*> m_messages_to_process;
122
123 void _waitAll();
124 void _wait(Parallel::eWaitType wait_mode);
125};
126
127/*---------------------------------------------------------------------------*/
128/*---------------------------------------------------------------------------*/
129
130void HybridSerializeMessageList::
131_wait(Parallel::eWaitType wait_mode)
132{
133 m_trace->info() << "BEGIN PROCESS MESSAGES";
134
135 // TODO: gérer la memoire sans faire de new.
136 ConstArrayView<ISerializeMessage*> messages = m_messages_to_process;
137 HybridMessageQueue* message_queue = m_parallel_mng->m_message_queue;
138 UniqueArray<Request> all_requests;
139 MessageTag HYBRID_MESSAGE_TAG(511);
140 for( ISerializeMessage* sm : messages ){
141 ISerializer* s = sm->serializer();
142 MessageRank orig(sm->source());
143 MessageRank dest(sm->destination());
144 PointToPointMessageInfo message_info(orig,dest,HYBRID_MESSAGE_TAG,Parallel::NonBlocking);
145 Request r;
146 if (sm->isSend())
147 r = message_queue->addSend(message_info,SendBufferInfo(s));
148 else
149 r = message_queue->addReceive(message_info,ReceiveBufferInfo(s));
150 all_requests.add(r);
151 }
152
153 if (wait_mode==Parallel::WaitAll)
154 message_queue->waitAll(all_requests);
155
156 for( ISerializeMessage* sm : messages )
157 sm->setFinished(true);
158}
159
160/*---------------------------------------------------------------------------*/
161/*---------------------------------------------------------------------------*/
162
163/*---------------------------------------------------------------------------*/
164/*---------------------------------------------------------------------------*/
165
167: public ParallelMngInternal
168{
169 public:
170
171 explicit Impl(HybridParallelMng* pm, HybridContigMachineShMemWinBaseInternalCreator* window_creator)
172 : ParallelMngInternal(pm)
173 , m_parallel_mng(pm)
174 , m_window_creator(window_creator)
175 , m_alloc(makeRef(new MachineShMemWinMemoryAllocator(pm)))
176 {}
177
178 ~Impl() override = default;
179
180 public:
181
183 {
184 if (m_shmem_available == 1) {
185 return true;
186 }
187
188 if (m_shmem_available == 0) {
189 Ref<IParallelTopology> topo = m_parallel_mng->_internalUtilsFactory()->createTopology(m_parallel_mng);
190 if (topo->machineRanks().size() == m_window_creator->machineRanks(m_parallel_mng->commRank(), m_parallel_mng->mpiParallelMng()).size()) {
191 m_shmem_available = 1;
192 return true;
193 }
194 // Problème avec MPI. Peut intervenir si MPICH est compilé en mode ch3:sock.
195 m_shmem_available = 2;
196 return false;
197 }
198
199 return false;
200 }
201
203 {
204 return makeRef(m_window_creator->createWindow(m_parallel_mng->commRank(), sizeof_segment, sizeof_type, m_parallel_mng->mpiParallelMng()));
205 }
206
208 {
209 return makeRef(m_window_creator->createDynamicWindow(m_parallel_mng->commRank(), sizeof_segment, sizeof_type, m_parallel_mng->mpiParallelMng()));
210 }
211
213 {
214 return MemoryAllocationOptions{ m_alloc.get() };
215 }
216
217 private:
218
219 HybridParallelMng* m_parallel_mng;
222
223 // 0 = Variable non initialisé
224 // 1 = Mémoire partagée dispo
225 // 2 = Mémoire partagée non dispo
226 Int8 m_shmem_available = 0;
227};
228
229/*---------------------------------------------------------------------------*/
230/*---------------------------------------------------------------------------*/
231
232/*---------------------------------------------------------------------------*/
233/*---------------------------------------------------------------------------*/
234
235HybridParallelMng::
236HybridParallelMng(const HybridParallelMngBuildInfo& bi)
237: ParallelMngDispatcher(ParallelMngDispatcherBuildInfo(bi.local_rank,bi.local_nb_rank))
238, m_trace(bi.trace_mng)
239, m_thread_mng(bi.thread_mng)
240, m_world_parallel_mng(bi.world_parallel_mng)
241, m_io_mng(nullptr)
242, m_timer_mng(nullptr)
243, m_replication(new ParallelReplication())
244, m_message_queue(new HybridMessageQueue(bi.message_queue,bi.mpi_parallel_mng,bi.local_nb_rank))
245, m_is_initialized(false)
246, m_stat(Parallel::createDefaultStat())
247, m_thread_barrier(bi.thread_barrier)
248, m_mpi_parallel_mng(bi.mpi_parallel_mng)
249, m_all_dispatchers(bi.all_dispatchers)
250, m_sub_builder_factory(bi.sub_builder_factory)
251, m_parent_container_ref(bi.container)
252, m_utils_factory(createRef<ParallelMngUtilsFactoryBase>())
253, m_parallel_mng_internal(new Impl(this, bi.window_creator))
254{
255 if (!m_world_parallel_mng)
256 m_world_parallel_mng = this;
257
258 // TODO: vérifier que tous les autres HybridParallelMng ont bien
259 // le même nombre de rang locaux (m_local_nb_rank)
260 m_local_rank = bi.local_rank;
261 m_local_nb_rank = bi.local_nb_rank;
262
263 Int32 mpi_rank = m_mpi_parallel_mng->commRank();
264 Int32 mpi_size = m_mpi_parallel_mng->commSize();
265
268
269 m_is_parallel = m_global_nb_rank!=1;
270}
271
272/*---------------------------------------------------------------------------*/
273/*---------------------------------------------------------------------------*/
274
275HybridParallelMng::
276~HybridParallelMng()
277{
278 m_sequential_parallel_mng.reset();
279 delete m_replication;
280 delete m_io_mng;
281 delete m_message_queue;
282 delete m_timer_mng;
283 delete m_stat;
284 delete m_mpi_parallel_mng;
285 delete m_parallel_mng_internal;
286}
287
288/*---------------------------------------------------------------------------*/
289/*---------------------------------------------------------------------------*/
290
291namespace
292{
293// Classe pour créer les différents dispatchers
294class DispatchCreator
295{
296 public:
297 DispatchCreator(ITraceMng* tm,HybridParallelMng* mpm,HybridMessageQueue* message_queue,MpiThreadAllDispatcher* all_dispatchers)
298 : m_tm(tm), m_mpm(mpm), m_message_queue(message_queue), m_all_dispatchers(all_dispatchers){}
299 public:
300 template<typename DataType> HybridParallelDispatch<DataType>*
301 create()
302 {
303 HybridMessageQueue* tmq = m_message_queue;
304 MpiThreadAllDispatcher* ad = m_all_dispatchers;
305 auto field = ad->instance((DataType*)nullptr).view();
306 return new HybridParallelDispatch<DataType>(m_tm,m_mpm,tmq,field);
307 }
308
309 ITraceMng* m_tm;
310 HybridParallelMng* m_mpm;
311 HybridMessageQueue* m_message_queue;
312 MpiThreadAllDispatcher* m_all_dispatchers;
313};
314}
315
316/*---------------------------------------------------------------------------*/
317/*---------------------------------------------------------------------------*/
318
320build()
321{
322 ITraceMng* tm = traceMng();
323 tm->info() << "Initialise HybridParallelMng"
324 << " global_rank=" << m_global_rank
325 << " local_rank=" << m_local_rank
326 << " mpi_rank=" << m_mpi_parallel_mng->commRank();
327
328 m_timer_mng = new TimerMng(tm);
329
330 // Créé le gestionnaire séquentiel associé.
331 {
333 bi.setTraceMng(traceMng());
334 bi.setCommunicator(communicator());
335 bi.setThreadMng(threadMng());
336 m_sequential_parallel_mng = arcaneCreateSequentialParallelMngRef(bi);
337 }
338
339 DispatchCreator creator(m_trace,this,m_message_queue,m_all_dispatchers);
340 this->createDispatchers(creator);
341 m_io_mng = arcaneCreateIOMng(this);
342}
343
344/*---------------------------------------------------------------------------*/
345/*---------------------------------------------------------------------------*/
346
347/*----------------------------------------------------------------------------*/
348/*---------------------------------------------------------------------------*/
349
352{
353 Trace::Setter mci(m_trace,"Thread");
354 if (m_is_initialized){
355 m_trace->warning() << "HybridParallelMng already initialized";
356 return;
357 }
358
359 m_is_initialized = true;
360}
361
362/*---------------------------------------------------------------------------*/
363/*---------------------------------------------------------------------------*/
364
365SerializeBuffer* HybridParallelMng::
366_castSerializer(ISerializer* serializer)
367{
368 auto sbuf = dynamic_cast<SerializeBuffer*>(serializer);
369 if (!sbuf)
370 ARCANE_THROW(ArgumentException,"can not cast 'ISerializer' to 'SerializeBuffer'");
371 return sbuf;
372}
373
374/*---------------------------------------------------------------------------*/
375/*---------------------------------------------------------------------------*/
376
379{
380 return m_utils_factory->createGetVariablesValuesOperation(this)._release();
381}
382
385{
386 return m_utils_factory->createTransferValuesOperation(this)._release();
387}
388
391{
392 return m_utils_factory->createExchanger(this)._release();
393}
394
395/*---------------------------------------------------------------------------*/
396/*---------------------------------------------------------------------------*/
397
398/*---------------------------------------------------------------------------*/
399/*---------------------------------------------------------------------------*/
400
401void HybridParallelMng::
402sendSerializer(ISerializer* s,Int32 rank)
403{
404 auto p2p_message = buildMessage(rank,Parallel::NonBlocking);
405 Request r = m_message_queue->addSend(p2p_message,s);
406 m_message_queue->waitAll(ArrayView<Request>(1,&r));
407}
408
409/*---------------------------------------------------------------------------*/
410/*---------------------------------------------------------------------------*/
411
412auto HybridParallelMng::
413sendSerializer(ISerializer* s,Int32 rank,ByteArray& bytes) -> Request
414{
415 ARCANE_UNUSED(bytes);
416 auto p2p_message = buildMessage(rank,Parallel::NonBlocking);
417 return m_message_queue->addSend(p2p_message,s);
418}
419
420/*---------------------------------------------------------------------------*/
421/*---------------------------------------------------------------------------*/
422
425{
426 return m_utils_factory->createSendSerializeMessage(this, rank)._release();
427}
428
429/*---------------------------------------------------------------------------*/
430/*---------------------------------------------------------------------------*/
431
432void HybridParallelMng::
433broadcastSerializer(ISerializer* values,Int32 rank)
434{
435 Timer::Phase tphase(timeStats(),TP_Communication);
436 SerializeBuffer* sbuf = _castSerializer(values);
437
438 bool is_broadcaster = (rank==commRank());
439
440 // Effectue l'envoie en deux phases. Envoie d'abord le nombre d'éléments
441 // puis envoie les éléments.
442 // TODO: il serait possible de le faire en une fois pour les messages
443 // ne dépassant pas une certaine taille.
444
446 if (is_broadcaster){
447 Int64 total_size = sbuf->totalSize();
448 Span<Byte> bytes = sbuf->globalBuffer();
449 this->broadcast(Int64ArrayView(1,&total_size),rank);
450 mpBroadcast(mpm,bytes,rank);
451 }
452 else{
453 Int64 total_size = 0;
454 this->broadcast(Int64ArrayView(1,&total_size),rank);
455 sbuf->preallocate(total_size);
456 Span<Byte> bytes = sbuf->globalBuffer();
457 mpBroadcast(mpm,bytes,rank);
458 sbuf->setFromSizes();
459 }
460}
461
462/*---------------------------------------------------------------------------*/
463/*---------------------------------------------------------------------------*/
464
465void HybridParallelMng::
466recvSerializer(ISerializer* s,Int32 rank)
467{
468 auto p2p_message = buildMessage(rank,Parallel::NonBlocking);
469 Request r = m_message_queue->addReceive(p2p_message,ReceiveBufferInfo(s));
470 m_message_queue->waitAll(ArrayView<Request>(1,&r));
471}
472
473/*---------------------------------------------------------------------------*/
474/*---------------------------------------------------------------------------*/
475
478{
479 return m_utils_factory->createReceiveSerializeMessage(this, rank)._release();
480}
481
482/*---------------------------------------------------------------------------*/
483/*---------------------------------------------------------------------------*/
484
487{
488 ARCANE_UNUSED(requests);
489 throw NotImplementedException(A_FUNCINFO);
490}
491
492/*---------------------------------------------------------------------------*/
493/*---------------------------------------------------------------------------*/
494
496probe(const PointToPointMessageInfo& message)
497{
498 PointToPointMessageInfo p2p_message(message);
500 return m_message_queue->probe(p2p_message);
501}
502
503/*---------------------------------------------------------------------------*/
504/*---------------------------------------------------------------------------*/
505
508{
509 PointToPointMessageInfo p2p_message(message);
511 return m_message_queue->legacyProbe(p2p_message);
512}
513
514/*---------------------------------------------------------------------------*/
515/*---------------------------------------------------------------------------*/
516
517Request HybridParallelMng::
518sendSerializer(const ISerializer* s,const PointToPointMessageInfo& message)
519{
520 auto p2p_message = buildMessage(message);
521 return m_message_queue->addSend(p2p_message,s);
522}
523
524/*---------------------------------------------------------------------------*/
525/*---------------------------------------------------------------------------*/
526
527Request HybridParallelMng::
528receiveSerializer(ISerializer* s,const PointToPointMessageInfo& message)
529{
530 auto p2p_message = buildMessage(message);
531 return m_message_queue->addReceive(p2p_message,ReceiveBufferInfo(s));
532}
533
534/*---------------------------------------------------------------------------*/
535/*---------------------------------------------------------------------------*/
536
539{
540 if (m_stat)
541 m_stat->print(m_trace);
542}
543
544/*---------------------------------------------------------------------------*/
545/*---------------------------------------------------------------------------*/
546
548barrier()
549{
550 m_thread_barrier->wait();
551 if (m_local_rank==0)
552 m_mpi_parallel_mng->barrier();
553 m_thread_barrier->wait();
554}
555
556/*---------------------------------------------------------------------------*/
557/*---------------------------------------------------------------------------*/
558
559ISerializeMessageList* HybridParallelMng::
560_createSerializeMessageList()
561{
562 auto* x = new MP::internal::SerializeMessageList(messagePassingMng());
563 x->setAllowAnyRankReceive(false);
564 return x;
565}
566
567/*---------------------------------------------------------------------------*/
568/*---------------------------------------------------------------------------*/
569
572{
573 return m_utils_factory->createSynchronizer(this,family)._release();
574}
575
576/*---------------------------------------------------------------------------*/
577/*---------------------------------------------------------------------------*/
578
580createSynchronizer(const ItemGroup& group)
581{
582 return m_utils_factory->createSynchronizer(this,group)._release();
583}
584
585/*---------------------------------------------------------------------------*/
586/*---------------------------------------------------------------------------*/
587
590{
591 return m_utils_factory->createTopology(this)._release();
592}
593
594/*---------------------------------------------------------------------------*/
595/*---------------------------------------------------------------------------*/
596
598replication() const
599{
600 return m_replication;
601}
602
603/*---------------------------------------------------------------------------*/
604/*---------------------------------------------------------------------------*/
605
608{
609 delete m_replication;
610 m_replication = v;
611}
612
613/*---------------------------------------------------------------------------*/
614/*---------------------------------------------------------------------------*/
615
618{
619 return m_sequential_parallel_mng.get();
620}
621
622/*---------------------------------------------------------------------------*/
623/*---------------------------------------------------------------------------*/
624
625Ref<IParallelMng> HybridParallelMng::
626sequentialParallelMngRef()
627{
628 return m_sequential_parallel_mng;
629}
630
631/*---------------------------------------------------------------------------*/
632/*---------------------------------------------------------------------------*/
638{
640 public:
641 RequestList(HybridParallelMng* pm)
642 : m_parallel_mng(pm), m_message_queue(pm->m_message_queue),
643 m_local_rank(m_parallel_mng->localRank()) {}
644 public:
645 void _wait(Parallel::eWaitType wait_type) override
646 {
647 switch(wait_type){
648 case Parallel::WaitAll:
649 m_parallel_mng->m_message_queue->waitAll(_requests());
650 break;
652 m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),false);
653 break;
655 m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),true);
656 }
657 }
658 private:
659 HybridParallelMng* m_parallel_mng;
660 HybridMessageQueue* m_message_queue;
662};
663
664/*---------------------------------------------------------------------------*/
665/*---------------------------------------------------------------------------*/
666
673
674/*---------------------------------------------------------------------------*/
675/*---------------------------------------------------------------------------*/
676
679{
680 m_message_queue->waitAll(requests);
681}
682
683/*---------------------------------------------------------------------------*/
684/*---------------------------------------------------------------------------*/
685
688{
689 return m_mpi_parallel_mng->getMPICommunicator();
690}
691
692/*---------------------------------------------------------------------------*/
693/*---------------------------------------------------------------------------*/
694
695MP::Communicator HybridParallelMng::
696communicator() const
697{
698 return m_mpi_parallel_mng->communicator();
699}
700
701/*---------------------------------------------------------------------------*/
702/*---------------------------------------------------------------------------*/
703
704MP::Communicator HybridParallelMng::
706{
707 return m_mpi_parallel_mng->machineCommunicator();
708}
709
710/*---------------------------------------------------------------------------*/
711/*---------------------------------------------------------------------------*/
712
715{
716 PointToPointMessageInfo p2p_message(message);
717 p2p_message.setEmiterRank(MessageRank(m_global_rank));
718 return p2p_message;
719}
720
721/*---------------------------------------------------------------------------*/
722/*---------------------------------------------------------------------------*/
723
726{
727 return buildMessage({MessageRank(dest),blocking_mode});
728}
729
730/*---------------------------------------------------------------------------*/
731/*---------------------------------------------------------------------------*/
732
733IParallelMng* HybridParallelMng::
734_createSubParallelMng(Int32ConstArrayView kept_ranks)
735{
736 ARCANE_UNUSED(kept_ranks);
737 ARCANE_THROW(NotSupportedException,"Use createSubParallelMngRef() instead");
738}
739
740/*---------------------------------------------------------------------------*/
741/*---------------------------------------------------------------------------*/
742
745{
746 // ATTENTION: Cette méthode est appelée simultanément par tous les threads
747 // partageant cet HybridParallelMng.
748
749 if (kept_ranks.empty())
750 ARCANE_FATAL("kept_ranks is empty");
751 ARCANE_CHECK_POINTER(m_sub_builder_factory);
752
753 m_trace->info() << "CREATE SUB_PARALLEL_MNG_REF";
754
755 /*
756 Il existe plusieurs possibilités:
757 1. on réduit juste le nombre de rangs en mémoire partagé pour chaque
758 processus MPI -< on créé un HybridParallelMng
759 2. On ne garde que le rang maitre de chaque processus MPI -> on créé un MpiParallelMng.
760 3. On ne garde que les rangs d'un même processus -> on créé un SharedMemoryParallelMng
761 4. On ne garde qu'un seul rang: on crée un MpiSequentialParallelMng.
762 */
763 // Pour l'instant, on ne supporte que le cas 1 et 2.
764 Int32 nb_kept_rank = kept_ranks.size();
765
766 // Détermine le nouveau nombre de rangs locaux par rang MPI.
767
768 // Regarde si je suis dans les listes des rangs conservés et si oui
769 // détermine mon rang dans le IParallelMng créé
770 Int32 first_global_rank_in_this_mpi = m_global_rank - m_local_rank;
771 Int32 last_global_rank_in_this_mpi = first_global_rank_in_this_mpi + m_local_nb_rank - 1;
772 // Mon nouveau rang local. Négatif si je ne suis pas dans le nouveau communicateur
773 Int32 my_new_global_rank = (-1);
774 Int32 new_local_nb_rank = 0;
775 Int32 my_new_local_rank = (-1);
776 for( Integer i=0; i<nb_kept_rank; ++i ){
777 Int32 kept_rank = kept_ranks[i];
778 if (kept_rank>=first_global_rank_in_this_mpi && kept_rank<last_global_rank_in_this_mpi)
779 ++new_local_nb_rank;
780 if (kept_rank==m_global_rank){
781 my_new_global_rank = i;
782 my_new_local_rank = new_local_nb_rank - 1;
783 }
784 }
785 bool has_new_rank = (my_new_global_rank != (-1));
786
787 // Calcule le min, le max et la somme sur tous les rangs du nombre de nouveaux.
788 // Deux cas peuvent se présenter:
789 // 1. Le min et le max sont égaux et supérieurs ou égaux à 2: Dans ce cas on créé
790 // un HybridParallelMng.
791 // 2. Le max vaut 1. Dans ce cas on créé un nouveau IParallelMng via le MpiParallelMng.
792 // Les rangs actuels pour lequels 'new_local_nb_rank' vaut 0 ne seront pas dans ce
793 // nouveau communicateur. Ce cas concerne aussi le cas où il ne reste plus qu'un
794 // seul rang à la fin.
795
796 Int32 min_new_local_nb_rank = -1;
797 Int32 max_new_local_nb_rank = -1;
798 Int32 sum_new_local_nb_rank = -1;
799 Int32 min_rank = A_NULL_RANK;
800 Int32 max_rank = A_NULL_RANK;
801 computeMinMaxSum(new_local_nb_rank,min_new_local_nb_rank,max_new_local_nb_rank,
802 sum_new_local_nb_rank,min_rank,max_rank);
803
804 m_trace->info() << "CREATE SUB_PARALLEL_MNG_REF new_local_nb_rank=" << new_local_nb_rank
805 << " min=" << min_new_local_nb_rank
806 << " max=" << max_new_local_nb_rank
807 << " sum=" << sum_new_local_nb_rank
808 << " new_global_rank=" << my_new_global_rank;
809
810 // S'il ne reste qu'un seul rang local, alors on construit uniquement un MpiParallelMng.
811 // Seul le PE qui a un nouveau rang est concerné et fait cela
812 if (max_new_local_nb_rank==1){
813 Integer nb_mpi_rank = m_mpi_parallel_mng->commSize();
814 // Il faut calculer les nouveaux rangs MPI.
815 // Si 'min_new_local_nb_rank' vaut 1, alors c'est simple car cela signifie qu'on garde
816 // tous les rangs MPI actuels (on fait l'équivalent d'un MPI_Comm_dup). Sinon, on
817 // récupère pour chaque rang MPI s'il sera dans le nouveau communicateur et on construit
818 // la liste des rangs conservés en fonction de cela.
819 // NOTE: dans tous les cas il faut faire attention qu'un seul thread utilise le
820 // 'm_mpi_parallel_mng'.
821 UniqueArray<Int32> kept_mpi_ranks;
823 bool do_mpi_call = false;
824 if (min_new_local_nb_rank==1){
825 if (has_new_rank){
826 do_mpi_call = true;
827 kept_mpi_ranks.resize(nb_mpi_rank);
828 for( Int32 x=0; x<nb_mpi_rank; ++x )
829 kept_mpi_ranks[x] = x;
830 }
831 }
832 else{
833 // Si je ne suis pas dans le nouveau communicateur, c'est le rang local 0 qui
834 // faut le 'gather'.
835 UniqueArray<Int16> gathered_ranks(nb_mpi_rank);
836 if (has_new_rank || m_local_rank==0){
837 do_mpi_call = true;
838 Int16 v = (has_new_rank) ? 1 : 0;
839 m_mpi_parallel_mng->allGather(ArrayView<Int16>(1,&v),gathered_ranks);
840 }
841 for( Int32 x=0; x<nb_mpi_rank; ++x )
842 if (gathered_ranks[x]==1)
843 kept_mpi_ranks.add(x);
844 }
845 if (do_mpi_call)
846 return m_mpi_parallel_mng->createSubParallelMngRef(kept_mpi_ranks);
847 else
848 return Ref<IParallelMng>();
849 }
850
851 if (max_new_local_nb_rank!=new_local_nb_rank)
852 ARCANE_FATAL("Not same number of new local ranks on every MPI processus: current={0} max={1}",
853 new_local_nb_rank,max_new_local_nb_rank);
854
855 if (max_new_local_nb_rank<2)
856 ARCANE_FATAL("number of local ranks is too low current={0} minimum=2",new_local_nb_rank);
857
858 // Met une barrière locale pour être sur que tout le monde attend ici.
859 m_thread_barrier->wait();
860
861 // NOTE: Le builder contient les parties communes aux IParallelMng créés. Il faut
862 // donc que ces derniers gardent une référence dessus sinon il sera détruit à la fin
863 // de cette méthode.
865
866 // Le rang 0 créé le builder
867 if (m_local_rank==0){
868 // Suppose qu'on à le même nombre de rangs MPI qu'avant donc on utilise
869 // le communicateur MPI qu'on a déjà.
870 MP::Communicator c = communicator();
871 MP::Communicator mc = machineCommunicator();
872 builder = m_sub_builder_factory->_createParallelMngBuilder(new_local_nb_rank, c, mc);
873 // Positionne le builder pour tout le monde
874 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder = builder;
875 }
876 // Attend pour être sur que tous les threads voit le bon builder.
877 m_thread_barrier->wait();
878
879 builder = m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder;
880 ARCANE_CHECK_POINTER(builder.get());
881
882 Ref<IParallelMng> new_parallel_mng;
883 if (my_new_local_rank>=0){
884 new_parallel_mng = builder->_createParallelMng(my_new_local_rank,traceMng());
885 }
886 m_thread_barrier->wait();
887
888 // Ici, tout le monde a créé son IParallelMng. On peut donc
889 // supprimer la référence au builder. Les IParallelMng créés gardent
890 // une référence au builder
891 if (m_local_rank==0){
892 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder.reset();
893 }
894 m_thread_barrier->wait();
895
896 return new_parallel_mng;
897}
898
899/*---------------------------------------------------------------------------*/
900/*---------------------------------------------------------------------------*/
901
904{
905 return m_utils_factory;
906}
907
908/*---------------------------------------------------------------------------*/
909/*---------------------------------------------------------------------------*/
910
911bool HybridParallelMng::
912_isAcceleratorAware() const
913{
914 return m_mpi_parallel_mng->_internalApi()->isAcceleratorAware();
915}
916
917/*---------------------------------------------------------------------------*/
918/*---------------------------------------------------------------------------*/
919
920} // End namespace Arcane::MessagePassing
921
922/*---------------------------------------------------------------------------*/
923/*---------------------------------------------------------------------------*/
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Liste des fonctions d'échange de message.
Exception lorsqu'un argument est invalide.
Vue modifiable d'un tableau d'un type T.
void resize(Int64 s)
Change le nombre d'éléments du tableau à s.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
Vue constante d'un tableau de type T.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
constexpr bool empty() const noexcept
true si le tableau est vide (size()==0)
Opérations pour accéder aux valeurs de variables d'un autre sous-domaine.
Interface du gestionnaire des entrées sorties.
Definition IIOMng.h:36
Interface d'une famille d'entités.
Definition IItemFamily.h:84
Échange d'informations entre processeurs.
virtual bool isAcceleratorAware() const =0
Indique si l'implémentation gère les accélérateurs.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual void computeMinMaxSum(char val, char &min_val, char &max_val, char &sum_val, Int32 &min_rank, Int32 &max_rank)=0
Calcule en une opération la somme, le min, le max d'une valeur.
Informations sur la réplication des sous-domaines en parallèle.
Informations sur la topologie d'allocation des coeurs de calcul.
Interface du gestionnaire de traces.
virtual TraceMessage info()=0
Flot pour un message d'information.
Envoie de valeurs sur différents processeurs.
Interface d'un service de synchronisation de variable.
Groupe d'entités de maillage.
Definition ItemGroup.h:49
Interface d'une file de messages avec les threads.
MemoryAllocationOptions machineShMemWinMemoryAllocator() override
Méthode permettant de récupérer un allocateur en mémoire partagée.
bool isMachineShMemWinAvailable() override
Méthode permettant de savoir si le mode mémoire partagée est supporté.
Ref< IContigMachineShMemWinBaseInternal > createContigMachineShMemWinBase(Int64 sizeof_segment, Int32 sizeof_type) override
Méthode permettant de créer une fenêtre mémoire sur le noeud.
Ref< IMachineShMemWinBaseInternal > createMachineShMemWinBase(Int64 sizeof_segment, Int32 sizeof_type) override
Méthode permettant de créer une fenêtre mémoire dynamique sur le noeud.
Implémentation de IRequestList pour HybridParallelMng.
void _wait(Parallel::eWaitType wait_type) override
Effectue l'attente ou le test.
Gestionnaire du parallélisme utilisant les threads.
IParallelMng * worldParallelMng() const override
Gestionnaire de parallélisme sur l'ensemble des ressources allouées.
void freeRequests(ArrayView< Request > requests) override
Libère les requêtes.
void printStats() override
Affiche des statistiques liées à ce gestionnaire du parallélisme.
IParallelExchanger * createExchanger() override
Retourne une interface pour transférer des messages entre processeurs.
void barrier() override
Effectue une barière.
MP::Communicator communicator() const override
Communicateur MPI associé à ce gestionnaire.
bool m_is_initialized
true si déjà initialisé
ISerializeMessage * createReceiveSerializer(Int32 rank) override
Créé un message non bloquant pour recevoir des données sérialisées du rang rank.
Ref< Parallel::IRequestList > createRequestListRef() override
Créé une liste de requêtes pour ce gestionnaire.
ITimerMng * timerMng() const override
Gestionnaire de timers.
void build() override
Construit l'instance.
void waitAllRequests(ArrayView< Request > requests) override
Bloque en attendant que les requêtes rvalues soient terminées.
void initialize() override
Initialise le gestionnaire du parallélisme.
IThreadMng * threadMng() const override
Gestionnaire de threads.
Int32 m_global_nb_rank
Nombre de rangs globaux.
MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
Int32 m_local_rank
Rang local du processeur actuel.
ITraceMng * traceMng() const override
Gestionnaire de traces.
IGetVariablesValuesParallelOperation * createGetVariablesValuesOperation() override
Retourne une opération pour récupérer les valeurs d'une variable sur les entités d'un autre sous-doma...
void * getMPICommunicator() override
Adresse du communicateur MPI associé à ce gestionnaire.
IParallelReplication * replication() const override
Informations sur la réplication.
Ref< IParallelMngUtilsFactory > _internalUtilsFactory() const override
Fabrique des fonctions utilitaires.
Ref< IParallelMng > createSubParallelMngRef(Int32ConstArrayView kept_ranks) override
Créé un nouveau gestionnaire de parallélisme pour un sous-ensemble des rangs.
MP::Communicator machineCommunicator() const override
Communicateur MPI issus du communicateur communicator() réunissant tous les processus du noeud de cal...
ITransferValuesParallelOperation * createTransferValuesOperation() override
Retourne une opération pour transférer des valeurs entre sous-domaine.
IParallelTopology * createTopology() override
Créé une instance contenant les infos sur la topologie des rangs de ce gestionnnaire.
void setReplication(IParallelReplication *v) override
Positionne les Informations sur la réplication.
IParallelMng * sequentialParallelMng() override
Retourne un gestionnaire de parallélisme séquentiel.
IVariableSynchronizer * createSynchronizer(IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
Int32 m_local_nb_rank
Nombre de rang locaux.
MessageId probe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
ISerializeMessage * createSendSerializer(Int32 rank) override
Créé un message non bloquant pour envoyer des données sérialisées au rang rank.
Int32 commRank() const override
Rang de cette instance dans le communicateur.
Int32 m_global_rank
Numéro du processeur actuel.
PointToPointMessageInfo buildMessage(Int32 dest, MP::eBlockingType is_blocking)
Construit un message avec pour destinataire dest.
void processPendingMessages() override
Envoie les messages de la liste qui ne l'ont pas encore été.
Integer waitMessages(Parallel::eWaitType wait_type) override
Attend que les messages aient terminé leur exécution.
void addMessage(ISerializeMessage *msg) override
Ajoute un message à la liste.
Interface du gestionnaire des échanges de messages.
Interface d'un message de sérialisation entre IMessagePassingMng.
Informations sur la source d'un message.
Informations pour envoyer/recevoir un message point à point.
void setEmiterRank(MessageRank rank)
Positionne le rang de l'émetteur du message.
Informations des buffers de réception.
Requête d'un message.
Definition Request.h:77
Classe de base d'une liste de requêtes.
IParallelMngInternal * _internalApi() override
API interne à Arcane.
Exception lorsqu'une fonction n'est pas implémentée.
Redirige la gestion des messages des sous-domaines suivant le type de l'argument.
IMessagePassingMng * messagePassingMng() const override
Gestionnaire de message de Arccore associé
ITimeStats * timeStats() const override
Gestionnaire de statistiques associé (peut être nul)
Classe de base d'une fabrique pour les fonctions utilitaires de IParallelMng.
Informations sur la réplication des sous-domaines en parallèle.
Référence à une instance.
InstanceType * get() const
Instance associée ou nullptr si aucune.
Implémentation d'un tampon pour la sérialisation.
Vue d'un tableau d'éléments de type T.
Definition Span.h:633
Gestionnaire de timer.
Definition TimerMng.h:39
Positionne la phase de l'action en cours d'exécution.
Definition Timer.h:128
Positionne une classe de message.
Vecteur 1D de données avec sémantique par valeur (style STL).
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
C void mpBroadcast(IMessagePassingMng *pm, Span< char > send_buf, Int32 rank)
@ WaitSome
Attend que tous les messages de la liste soient traités.
eBlockingType
Type indiquant si un message est bloquant ou non.
Implémentation de la concurrence.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
std::int8_t Int8
Type entier signé sur 8 bits.
ArrayView< Int64 > Int64ArrayView
Equivalent C d'un tableau à une dimension d'entiers 64 bits.
Definition UtilsTypes.h:451
Ref< TrueType > createRef(Args &&... args)
Créé une instance de type TrueType avec les arguments Args et retourne une référence dessus.
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
Array< Byte > ByteArray
Tableau dynamique à une dimension de caractères.
Definition UtilsTypes.h:121
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:482
std::int16_t Int16
Type entier signé sur 16 bits.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int32_t Int32
Type entier signé sur 32 bits.
Infos pour construire un HybridParallelMng.
Infos pour construire un SequentialParallelMng.