Arcane  v3.16.8.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
HybridParallelMng.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* HybridParallelMng.cc (C) 2000-2025 */
9/* */
10/* Gestionnaire de parallélisme utilisant un mixte MPI/Threads. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/parallel/mpithread/HybridParallelMng.h"
15
16#include "arcane/utils/NotImplementedException.h"
17#include "arcane/utils/NotSupportedException.h"
18#include "arcane/utils/FatalErrorException.h"
19#include "arcane/utils/NumericTypes.h"
20#include "arcane/utils/ArgumentException.h"
21#include "arcane/utils/IThreadBarrier.h"
22#include "arcane/utils/ITraceMng.h"
23
24#include "arcane/core/IIOMng.h"
25#include "arcane/core/Timer.h"
26#include "arcane/core/ISerializeMessageList.h"
27#include "arcane/core/IItemFamily.h"
28#include "arcane/core/internal/ParallelMngInternal.h"
29#include "arcane/core/internal/SerializeMessage.h"
30#include "arcane/core/parallel/IStat.h"
31
32#include "arcane/parallel/mpithread/HybridParallelDispatch.h"
33#include "arcane/parallel/mpithread/HybridMessageQueue.h"
34#include "arcane/parallel/mpithread/internal/HybridMachineMemoryWindowBaseInternalCreator.h"
35#include "arcane/parallel/mpithread/internal/HybridMachineMemoryWindowBaseInternal.h"
36#include "arcane/parallel/mpithread/internal/HybridDynamicMachineMemoryWindowBaseInternal.h"
37
38#include "arcane/parallel/mpi/MpiParallelMng.h"
39
40#include "arcane/impl/TimerMng.h"
41#include "arcane/impl/ParallelReplication.h"
42#include "arcane/impl/SequentialParallelMng.h"
43#include "arcane/impl/internal/ParallelMngUtilsFactoryBase.h"
44
46#include "arccore/message_passing/RequestListBase.h"
47#include "arccore/message_passing/SerializeMessageList.h"
48
49/*---------------------------------------------------------------------------*/
50/*---------------------------------------------------------------------------*/
51
52namespace Arcane
53{
54extern "C++" IIOMng*
55arcaneCreateIOMng(IParallelMng* psm);
56}
57
59{
60
61/*---------------------------------------------------------------------------*/
62/*---------------------------------------------------------------------------*/
63
64// NOTE: Cette classe n'est plus utilisée. Elle reste pour référence
65// et sera supprimée ultérieurement
66class HybridSerializeMessageList
68{
69 public:
70 class HybridSerializeMessageRequest
71 {
72 public:
73 HybridSerializeMessageRequest(ISerializeMessage* message,Request request)
74 : m_message(message), m_request(request){}
75 public:
76 ISerializeMessage* m_message = nullptr;
77 Request m_request;
78 };
79
80 public:
81
82 explicit HybridSerializeMessageList(HybridParallelMng* mpm)
83 : m_parallel_mng(mpm), m_trace(mpm->traceMng())
84 {
85 }
86
87 public:
88
89 void addMessage(ISerializeMessage* msg) override
90 {
91 m_messages_to_process.add(msg);
92
93 }
94 void processPendingMessages() override
95 {
96 }
97
99 {
100 switch(wait_type){
101 case Parallel::WaitAll:
102 // Pour l'instant seul le mode bloquant est supporté.
103 //m_parallel_mng->processMessages(m_messages_to_process);
104 _wait(Parallel::WaitAll);
105 m_messages_to_process.clear();
106 return (-1);
110 ARCANE_THROW(NotImplementedException,"WaitSomeNonBlocking");
111 }
112 return (-1);
113 }
114
115 private:
116
117 HybridParallelMng* m_parallel_mng;
118 ITraceMng* m_trace;
119 UniqueArray<ISerializeMessage*> m_messages_to_process;
120
121 void _waitAll();
122 void _wait(Parallel::eWaitType wait_mode);
123};
124
125/*---------------------------------------------------------------------------*/
126/*---------------------------------------------------------------------------*/
127
128void HybridSerializeMessageList::
129_wait(Parallel::eWaitType wait_mode)
130{
131 m_trace->info() << "BEGIN PROCESS MESSAGES";
132
133 // TODO: gérer la memoire sans faire de new.
134 ConstArrayView<ISerializeMessage*> messages = m_messages_to_process;
135 HybridMessageQueue* message_queue = m_parallel_mng->m_message_queue;
136 UniqueArray<Request> all_requests;
137 MessageTag HYBRID_MESSAGE_TAG(511);
138 for( ISerializeMessage* sm : messages ){
139 ISerializer* s = sm->serializer();
140 MessageRank orig(sm->source());
141 MessageRank dest(sm->destination());
142 PointToPointMessageInfo message_info(orig,dest,HYBRID_MESSAGE_TAG,Parallel::NonBlocking);
143 Request r;
144 if (sm->isSend())
145 r = message_queue->addSend(message_info,SendBufferInfo(s));
146 else
147 r = message_queue->addReceive(message_info,ReceiveBufferInfo(s));
148 all_requests.add(r);
149 }
150
151 if (wait_mode==Parallel::WaitAll)
152 message_queue->waitAll(all_requests);
153
154 for( ISerializeMessage* sm : messages )
155 sm->setFinished(true);
156}
157
158/*---------------------------------------------------------------------------*/
159/*---------------------------------------------------------------------------*/
160
161/*---------------------------------------------------------------------------*/
162/*---------------------------------------------------------------------------*/
163
165: public ParallelMngInternal
166{
167 public:
168
169 explicit Impl(HybridParallelMng* pm, HybridMachineMemoryWindowBaseInternalCreator* window_creator)
170 : ParallelMngInternal(pm)
171 , m_parallel_mng(pm)
172 , m_window_creator(window_creator)
173 {}
174
175 ~Impl() override = default;
176
177 public:
178
180 {
181 return makeRef(m_window_creator->createWindow(m_parallel_mng->commRank(), sizeof_segment, sizeof_type, m_parallel_mng->mpiParallelMng()));
182 }
183
185 {
186 return makeRef(m_window_creator->createDynamicWindow(m_parallel_mng->commRank(), sizeof_segment, sizeof_type, m_parallel_mng->mpiParallelMng()));
187 }
188
189 private:
190
191 HybridParallelMng* m_parallel_mng;
193};
194
195/*---------------------------------------------------------------------------*/
196/*---------------------------------------------------------------------------*/
197
198/*---------------------------------------------------------------------------*/
199/*---------------------------------------------------------------------------*/
200
201HybridParallelMng::
202HybridParallelMng(const HybridParallelMngBuildInfo& bi)
203: ParallelMngDispatcher(ParallelMngDispatcherBuildInfo(bi.local_rank,bi.local_nb_rank))
204, m_trace(bi.trace_mng)
205, m_thread_mng(bi.thread_mng)
206, m_world_parallel_mng(bi.world_parallel_mng)
207, m_io_mng(nullptr)
208, m_timer_mng(nullptr)
209, m_replication(new ParallelReplication())
210, m_message_queue(new HybridMessageQueue(bi.message_queue,bi.mpi_parallel_mng,bi.local_nb_rank))
211, m_is_initialized(false)
212, m_stat(Parallel::createDefaultStat())
213, m_thread_barrier(bi.thread_barrier)
214, m_mpi_parallel_mng(bi.mpi_parallel_mng)
215, m_all_dispatchers(bi.all_dispatchers)
216, m_sub_builder_factory(bi.sub_builder_factory)
217, m_parent_container_ref(bi.container)
218, m_utils_factory(createRef<ParallelMngUtilsFactoryBase>())
219, m_parallel_mng_internal(new Impl(this, bi.window_creator))
220{
221 if (!m_world_parallel_mng)
222 m_world_parallel_mng = this;
223
224 // TODO: vérifier que tous les autres HybridParallelMng ont bien
225 // le même nombre de rang locaux (m_local_nb_rank)
226 m_local_rank = bi.local_rank;
227 m_local_nb_rank = bi.local_nb_rank;
228
229 Int32 mpi_rank = m_mpi_parallel_mng->commRank();
230 Int32 mpi_size = m_mpi_parallel_mng->commSize();
231
234
235 m_is_parallel = m_global_nb_rank!=1;
236}
237
238/*---------------------------------------------------------------------------*/
239/*---------------------------------------------------------------------------*/
240
241HybridParallelMng::
242~HybridParallelMng()
243{
244 m_sequential_parallel_mng.reset();
245 delete m_replication;
246 delete m_io_mng;
247 delete m_message_queue;
248 delete m_timer_mng;
249 delete m_stat;
250 delete m_mpi_parallel_mng;
251 delete m_parallel_mng_internal;
252}
253
254/*---------------------------------------------------------------------------*/
255/*---------------------------------------------------------------------------*/
256
257namespace
258{
259// Classe pour créer les différents dispatchers
260class DispatchCreator
261{
262 public:
263 DispatchCreator(ITraceMng* tm,HybridParallelMng* mpm,HybridMessageQueue* message_queue,MpiThreadAllDispatcher* all_dispatchers)
264 : m_tm(tm), m_mpm(mpm), m_message_queue(message_queue), m_all_dispatchers(all_dispatchers){}
265 public:
266 template<typename DataType> HybridParallelDispatch<DataType>*
267 create()
268 {
269 HybridMessageQueue* tmq = m_message_queue;
270 MpiThreadAllDispatcher* ad = m_all_dispatchers;
271 auto field = ad->instance((DataType*)nullptr).view();
272 return new HybridParallelDispatch<DataType>(m_tm,m_mpm,tmq,field);
273 }
274
275 ITraceMng* m_tm;
276 HybridParallelMng* m_mpm;
277 HybridMessageQueue* m_message_queue;
278 MpiThreadAllDispatcher* m_all_dispatchers;
279};
280}
281
282/*---------------------------------------------------------------------------*/
283/*---------------------------------------------------------------------------*/
284
286build()
287{
288 ITraceMng* tm = traceMng();
289 tm->info() << "Initialise HybridParallelMng"
290 << " global_rank=" << m_global_rank
291 << " local_rank=" << m_local_rank
292 << " mpi_rank=" << m_mpi_parallel_mng->commRank();
293
294 m_timer_mng = new TimerMng(tm);
295
296 // Créé le gestionnaire séquentiel associé.
297 {
299 bi.setTraceMng(traceMng());
300 bi.setCommunicator(communicator());
301 bi.setThreadMng(threadMng());
302 m_sequential_parallel_mng = arcaneCreateSequentialParallelMngRef(bi);
303 }
304
305 DispatchCreator creator(m_trace,this,m_message_queue,m_all_dispatchers);
306 this->createDispatchers(creator);
307 m_io_mng = arcaneCreateIOMng(this);
308}
309
310/*---------------------------------------------------------------------------*/
311/*---------------------------------------------------------------------------*/
312
313/*----------------------------------------------------------------------------*/
314/*---------------------------------------------------------------------------*/
315
318{
319 Trace::Setter mci(m_trace,"Thread");
320 if (m_is_initialized){
321 m_trace->warning() << "HybridParallelMng already initialized";
322 return;
323 }
324
325 m_is_initialized = true;
326}
327
328/*---------------------------------------------------------------------------*/
329/*---------------------------------------------------------------------------*/
330
331SerializeBuffer* HybridParallelMng::
332_castSerializer(ISerializer* serializer)
333{
334 auto sbuf = dynamic_cast<SerializeBuffer*>(serializer);
335 if (!sbuf)
336 ARCANE_THROW(ArgumentException,"can not cast 'ISerializer' to 'SerializeBuffer'");
337 return sbuf;
338}
339
340/*---------------------------------------------------------------------------*/
341/*---------------------------------------------------------------------------*/
342
345{
346 return m_utils_factory->createGetVariablesValuesOperation(this)._release();
347}
348
351{
352 return m_utils_factory->createTransferValuesOperation(this)._release();
353}
354
357{
358 return m_utils_factory->createExchanger(this)._release();
359}
360
361/*---------------------------------------------------------------------------*/
362/*---------------------------------------------------------------------------*/
363
364/*---------------------------------------------------------------------------*/
365/*---------------------------------------------------------------------------*/
366
367void HybridParallelMng::
368sendSerializer(ISerializer* s,Int32 rank)
369{
370 auto p2p_message = buildMessage(rank,Parallel::NonBlocking);
371 Request r = m_message_queue->addSend(p2p_message,s);
372 m_message_queue->waitAll(ArrayView<Request>(1,&r));
373}
374
375/*---------------------------------------------------------------------------*/
376/*---------------------------------------------------------------------------*/
377
378auto HybridParallelMng::
379sendSerializer(ISerializer* s,Int32 rank,ByteArray& bytes) -> Request
380{
381 ARCANE_UNUSED(bytes);
382 auto p2p_message = buildMessage(rank,Parallel::NonBlocking);
383 return m_message_queue->addSend(p2p_message,s);
384}
385
386/*---------------------------------------------------------------------------*/
387/*---------------------------------------------------------------------------*/
388
391{
392 return m_utils_factory->createSendSerializeMessage(this, rank)._release();
393}
394
395/*---------------------------------------------------------------------------*/
396/*---------------------------------------------------------------------------*/
397
398void HybridParallelMng::
399broadcastSerializer(ISerializer* values,Int32 rank)
400{
401 Timer::Phase tphase(timeStats(),TP_Communication);
402 SerializeBuffer* sbuf = _castSerializer(values);
403
404 bool is_broadcaster = (rank==commRank());
405
406 // Effectue l'envoie en deux phases. Envoie d'abord le nombre d'éléments
407 // puis envoie les éléments.
408 // TODO: il serait possible de le faire en une fois pour les messages
409 // ne dépassant pas une certaine taille.
410
412 if (is_broadcaster){
413 Int64 total_size = sbuf->totalSize();
414 Span<Byte> bytes = sbuf->globalBuffer();
415 this->broadcast(Int64ArrayView(1,&total_size),rank);
416 mpBroadcast(mpm,bytes,rank);
417 }
418 else{
419 Int64 total_size = 0;
420 this->broadcast(Int64ArrayView(1,&total_size),rank);
421 sbuf->preallocate(total_size);
422 Span<Byte> bytes = sbuf->globalBuffer();
423 mpBroadcast(mpm,bytes,rank);
424 sbuf->setFromSizes();
425 }
426}
427
428/*---------------------------------------------------------------------------*/
429/*---------------------------------------------------------------------------*/
430
431void HybridParallelMng::
432recvSerializer(ISerializer* s,Int32 rank)
433{
434 auto p2p_message = buildMessage(rank,Parallel::NonBlocking);
435 Request r = m_message_queue->addReceive(p2p_message,ReceiveBufferInfo(s));
436 m_message_queue->waitAll(ArrayView<Request>(1,&r));
437}
438
439/*---------------------------------------------------------------------------*/
440/*---------------------------------------------------------------------------*/
441
444{
445 return m_utils_factory->createReceiveSerializeMessage(this, rank)._release();
446}
447
448/*---------------------------------------------------------------------------*/
449/*---------------------------------------------------------------------------*/
450
453{
454 ARCANE_UNUSED(requests);
455 throw NotImplementedException(A_FUNCINFO);
456}
457
458/*---------------------------------------------------------------------------*/
459/*---------------------------------------------------------------------------*/
460
462probe(const PointToPointMessageInfo& message)
463{
464 PointToPointMessageInfo p2p_message(message);
466 return m_message_queue->probe(p2p_message);
467}
468
469/*---------------------------------------------------------------------------*/
470/*---------------------------------------------------------------------------*/
471
474{
475 PointToPointMessageInfo p2p_message(message);
477 return m_message_queue->legacyProbe(p2p_message);
478}
479
480/*---------------------------------------------------------------------------*/
481/*---------------------------------------------------------------------------*/
482
483Request HybridParallelMng::
484sendSerializer(const ISerializer* s,const PointToPointMessageInfo& message)
485{
486 auto p2p_message = buildMessage(message);
487 return m_message_queue->addSend(p2p_message,s);
488}
489
490/*---------------------------------------------------------------------------*/
491/*---------------------------------------------------------------------------*/
492
493Request HybridParallelMng::
494receiveSerializer(ISerializer* s,const PointToPointMessageInfo& message)
495{
496 auto p2p_message = buildMessage(message);
497 return m_message_queue->addReceive(p2p_message,ReceiveBufferInfo(s));
498}
499
500/*---------------------------------------------------------------------------*/
501/*---------------------------------------------------------------------------*/
502
505{
506 if (m_stat)
507 m_stat->print(m_trace);
508}
509
510/*---------------------------------------------------------------------------*/
511/*---------------------------------------------------------------------------*/
512
514barrier()
515{
516 m_thread_barrier->wait();
517 if (m_local_rank==0)
518 m_mpi_parallel_mng->barrier();
519 m_thread_barrier->wait();
520}
521
522/*---------------------------------------------------------------------------*/
523/*---------------------------------------------------------------------------*/
524
525ISerializeMessageList* HybridParallelMng::
526_createSerializeMessageList()
527{
528 auto* x = new MP::internal::SerializeMessageList(messagePassingMng());
529 x->setAllowAnyRankReceive(false);
530 return x;
531}
532
533/*---------------------------------------------------------------------------*/
534/*---------------------------------------------------------------------------*/
535
538{
539 return m_utils_factory->createSynchronizer(this,family)._release();
540}
541
542/*---------------------------------------------------------------------------*/
543/*---------------------------------------------------------------------------*/
544
546createSynchronizer(const ItemGroup& group)
547{
548 return m_utils_factory->createSynchronizer(this,group)._release();
549}
550
551/*---------------------------------------------------------------------------*/
552/*---------------------------------------------------------------------------*/
553
556{
557 return m_utils_factory->createTopology(this)._release();
558}
559
560/*---------------------------------------------------------------------------*/
561/*---------------------------------------------------------------------------*/
562
564replication() const
565{
566 return m_replication;
567}
568
569/*---------------------------------------------------------------------------*/
570/*---------------------------------------------------------------------------*/
571
574{
575 delete m_replication;
576 m_replication = v;
577}
578
579/*---------------------------------------------------------------------------*/
580/*---------------------------------------------------------------------------*/
581
584{
585 return m_sequential_parallel_mng.get();
586}
587
588/*---------------------------------------------------------------------------*/
589/*---------------------------------------------------------------------------*/
590
591Ref<IParallelMng> HybridParallelMng::
592sequentialParallelMngRef()
593{
594 return m_sequential_parallel_mng;
595}
596
597/*---------------------------------------------------------------------------*/
598/*---------------------------------------------------------------------------*/
604{
606 public:
607 RequestList(HybridParallelMng* pm)
608 : m_parallel_mng(pm), m_message_queue(pm->m_message_queue),
609 m_local_rank(m_parallel_mng->localRank()) {}
610 public:
611 void _wait(Parallel::eWaitType wait_type) override
612 {
613 switch(wait_type){
614 case Parallel::WaitAll:
615 m_parallel_mng->m_message_queue->waitAll(_requests());
616 break;
618 m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),false);
619 break;
621 m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),true);
622 }
623 }
624 private:
625 HybridParallelMng* m_parallel_mng;
626 HybridMessageQueue* m_message_queue;
628};
629
630/*---------------------------------------------------------------------------*/
631/*---------------------------------------------------------------------------*/
632
639
640/*---------------------------------------------------------------------------*/
641/*---------------------------------------------------------------------------*/
642
645{
646 m_message_queue->waitAll(requests);
647}
648
649/*---------------------------------------------------------------------------*/
650/*---------------------------------------------------------------------------*/
651
654{
655 return m_mpi_parallel_mng->getMPICommunicator();
656}
657
658/*---------------------------------------------------------------------------*/
659/*---------------------------------------------------------------------------*/
660
661MP::Communicator HybridParallelMng::
662communicator() const
663{
664 return m_mpi_parallel_mng->communicator();
665}
666
667/*---------------------------------------------------------------------------*/
668/*---------------------------------------------------------------------------*/
669
672{
673 PointToPointMessageInfo p2p_message(message);
674 p2p_message.setEmiterRank(MessageRank(m_global_rank));
675 return p2p_message;
676}
677
678/*---------------------------------------------------------------------------*/
679/*---------------------------------------------------------------------------*/
680
683{
684 return buildMessage({MessageRank(dest),blocking_mode});
685}
686
687/*---------------------------------------------------------------------------*/
688/*---------------------------------------------------------------------------*/
689
690IParallelMng* HybridParallelMng::
691_createSubParallelMng(Int32ConstArrayView kept_ranks)
692{
693 ARCANE_UNUSED(kept_ranks);
694 ARCANE_THROW(NotSupportedException,"Use createSubParallelMngRef() instead");
695}
696
697/*---------------------------------------------------------------------------*/
698/*---------------------------------------------------------------------------*/
699
702{
703 // ATTENTION: Cette méthode est appelée simultanément par tous les threads
704 // partageant cet HybridParallelMng.
705
706 if (kept_ranks.empty())
707 ARCANE_FATAL("kept_ranks is empty");
708 ARCANE_CHECK_POINTER(m_sub_builder_factory);
709
710 m_trace->info() << "CREATE SUB_PARALLEL_MNG_REF";
711
712 /*
713 Il existe plusieurs possibilités:
714 1. on réduit juste le nombre de rangs en mémoire partagé pour chaque
715 processus MPI -< on créé un HybridParallelMng
716 2. On ne garde que le rang maitre de chaque processus MPI -> on créé un MpiParallelMng.
717 3. On ne garde que les rangs d'un même processus -> on créé un SharedMemoryParallelMng
718 4. On ne garde qu'un seul rang: on crée un MpiSequentialParallelMng.
719 */
720 // Pour l'instant, on ne supporte que le cas 1 et 2.
721 Int32 nb_kept_rank = kept_ranks.size();
722
723 // Détermine le nouveau nombre de rangs locaux par rang MPI.
724
725 // Regarde si je suis dans les listes des rangs conservés et si oui
726 // détermine mon rang dans le IParallelMng créé
727 Int32 first_global_rank_in_this_mpi = m_global_rank - m_local_rank;
728 Int32 last_global_rank_in_this_mpi = first_global_rank_in_this_mpi + m_local_nb_rank - 1;
729 // Mon nouveau rang local. Négatif si je ne suis pas dans le nouveau communicateur
730 Int32 my_new_global_rank = (-1);
731 Int32 new_local_nb_rank = 0;
732 Int32 my_new_local_rank = (-1);
733 for( Integer i=0; i<nb_kept_rank; ++i ){
734 Int32 kept_rank = kept_ranks[i];
735 if (kept_rank>=first_global_rank_in_this_mpi && kept_rank<last_global_rank_in_this_mpi)
736 ++new_local_nb_rank;
737 if (kept_rank==m_global_rank){
738 my_new_global_rank = i;
739 my_new_local_rank = new_local_nb_rank - 1;
740 }
741 }
742 bool has_new_rank = (my_new_global_rank != (-1));
743
744 // Calcule le min, le max et la somme sur tous les rangs du nombre de nouveaux.
745 // Deux cas peuvent se présenter:
746 // 1. Le min et le max sont égaux et supérieurs ou égaux à 2: Dans ce cas on créé
747 // un HybridParallelMng.
748 // 2. Le max vaut 1. Dans ce cas on créé un nouveau IParallelMng via le MpiParallelMng.
749 // Les rangs actuels pour lequels 'new_local_nb_rank' vaut 0 ne seront pas dans ce
750 // nouveau communicateur. Ce cas concerne aussi le cas où il ne reste plus qu'un
751 // seul rang à la fin.
752
753 Int32 min_new_local_nb_rank = -1;
754 Int32 max_new_local_nb_rank = -1;
755 Int32 sum_new_local_nb_rank = -1;
756 Int32 min_rank = A_NULL_RANK;
757 Int32 max_rank = A_NULL_RANK;
758 computeMinMaxSum(new_local_nb_rank,min_new_local_nb_rank,max_new_local_nb_rank,
759 sum_new_local_nb_rank,min_rank,max_rank);
760
761 m_trace->info() << "CREATE SUB_PARALLEL_MNG_REF new_local_nb_rank=" << new_local_nb_rank
762 << " min=" << min_new_local_nb_rank
763 << " max=" << max_new_local_nb_rank
764 << " sum=" << sum_new_local_nb_rank
765 << " new_global_rank=" << my_new_global_rank;
766
767 // S'il ne reste qu'un seul rang local, alors on construit uniquement un MpiParallelMng.
768 // Seul le PE qui a un nouveau rang est concerné et fait cela
769 if (max_new_local_nb_rank==1){
770 Integer nb_mpi_rank = m_mpi_parallel_mng->commSize();
771 // Il faut calculer les nouveaux rangs MPI.
772 // Si 'min_new_local_nb_rank' vaut 1, alors c'est simple car cela signifie qu'on garde
773 // tous les rangs MPI actuels (on fait l'équivalent d'un MPI_Comm_dup). Sinon, on
774 // récupère pour chaque rang MPI s'il sera dans le nouveau communicateur et on construit
775 // la liste des rangs conservés en fonction de cela.
776 // NOTE: dans tous les cas il faut faire attention qu'un seul thread utilise le
777 // 'm_mpi_parallel_mng'.
778 UniqueArray<Int32> kept_mpi_ranks;
780 bool do_mpi_call = false;
781 if (min_new_local_nb_rank==1){
782 if (has_new_rank){
783 do_mpi_call = true;
784 kept_mpi_ranks.resize(nb_mpi_rank);
785 for( Int32 x=0; x<nb_mpi_rank; ++x )
786 kept_mpi_ranks[x] = x;
787 }
788 }
789 else{
790 // Si je ne suis pas dans le nouveau communicateur, c'est le rang local 0 qui
791 // faut le 'gather'.
792 UniqueArray<Int16> gathered_ranks(nb_mpi_rank);
793 if (has_new_rank || m_local_rank==0){
794 do_mpi_call = true;
795 Int16 v = (has_new_rank) ? 1 : 0;
796 m_mpi_parallel_mng->allGather(ArrayView<Int16>(1,&v),gathered_ranks);
797 }
798 for( Int32 x=0; x<nb_mpi_rank; ++x )
799 if (gathered_ranks[x]==1)
800 kept_mpi_ranks.add(x);
801 }
802 if (do_mpi_call)
803 return m_mpi_parallel_mng->createSubParallelMngRef(kept_mpi_ranks);
804 else
805 return Ref<IParallelMng>();
806 }
807
808 if (max_new_local_nb_rank!=new_local_nb_rank)
809 ARCANE_FATAL("Not same number of new local ranks on every MPI processus: current={0} max={1}",
810 new_local_nb_rank,max_new_local_nb_rank);
811
812 if (max_new_local_nb_rank<2)
813 ARCANE_FATAL("number of local ranks is too low current={0} minimum=2",new_local_nb_rank);
814
815 // Met une barrière locale pour être sur que tout le monde attend ici.
816 m_thread_barrier->wait();
817
818 // NOTE: Le builder contient les parties communes aux IParallelMng créés. Il faut
819 // donc que ces derniers gardent une référence dessus sinon il sera détruit à la fin
820 // de cette méthode.
822
823 // Le rang 0 créé le builder
824 if (m_local_rank==0){
825 // Suppose qu'on à le même nombre de rangs MPI qu'avant donc on utilise
826 // le communicateur MPI qu'on a déjà.
827 MP::Communicator c = communicator();
828 builder = m_sub_builder_factory->_createParallelMngBuilder(new_local_nb_rank,c);
829 // Positionne le builder pour tout le monde
830 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder = builder;
831 }
832 // Attend pour être sur que tous les threads voit le bon builder.
833 m_thread_barrier->wait();
834
835 builder = m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder;
836 ARCANE_CHECK_POINTER(builder.get());
837
838 Ref<IParallelMng> new_parallel_mng;
839 if (my_new_local_rank>=0){
840 new_parallel_mng = builder->_createParallelMng(my_new_local_rank,traceMng());
841 }
842 m_thread_barrier->wait();
843
844 // Ici, tout le monde a créé son IParallelMng. On peut donc
845 // supprimer la référence au builder. Les IParallelMng créés gardent
846 // une référence au builder
847 if (m_local_rank==0){
848 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder.reset();
849 }
850 m_thread_barrier->wait();
851
852 return new_parallel_mng;
853}
854
855/*---------------------------------------------------------------------------*/
856/*---------------------------------------------------------------------------*/
857
860{
861 return m_utils_factory;
862}
863
864/*---------------------------------------------------------------------------*/
865/*---------------------------------------------------------------------------*/
866
867bool HybridParallelMng::
868_isAcceleratorAware() const
869{
870 return m_mpi_parallel_mng->_internalApi()->isAcceleratorAware();
871}
872
873/*---------------------------------------------------------------------------*/
874/*---------------------------------------------------------------------------*/
875
876} // End namespace Arcane::MessagePassing
877
878/*---------------------------------------------------------------------------*/
879/*---------------------------------------------------------------------------*/
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Liste des fonctions d'échange de message.
Exception lorsqu'un argument est invalide.
Vue modifiable d'un tableau d'un type T.
void resize(Int64 s)
Change le nombre d'éléments du tableau à s.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
Vue constante d'un tableau de type T.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
constexpr bool empty() const noexcept
true si le tableau est vide (size()==0)
Opérations pour accéder aux valeurs de variables d'un autre sous-domaine.
Interface du gestionnaire des entrées sorties.
Definition IIOMng.h:36
Interface d'une famille d'entités.
Definition IItemFamily.h:84
Échange d'informations entre processeurs.
virtual bool isAcceleratorAware() const =0
Indique si l'implémentation gère les accélérateurs.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual void computeMinMaxSum(char val, char &min_val, char &max_val, char &sum_val, Int32 &min_rank, Int32 &max_rank)=0
Calcule en une opération la somme, le min, le max d'une valeur.
Informations sur la réplication des sous-domaines en parallèle.
Informations sur la topologie d'allocation des coeurs de calcul.
Interface du gestionnaire de traces.
virtual TraceMessage info()=0
Flot pour un message d'information.
Envoie de valeurs sur différents processeurs.
Interface d'un service de synchronisation de variable.
Groupe d'entités de maillage.
Definition ItemGroup.h:49
Interface d'une file de messages avec les threads.
Ref< IMachineMemoryWindowBaseInternal > createMachineMemoryWindowBase(Int64 sizeof_segment, Int32 sizeof_type) override
Méthode permettant de créer une fenêtre mémoire sur le noeud.
Ref< IDynamicMachineMemoryWindowBaseInternal > createDynamicMachineMemoryWindowBase(Int64 sizeof_segment, Int32 sizeof_type) override
Méthode permettant de créer une fenêtre mémoire dynamique sur le noeud.
Implémentation de IRequestList pour HybridParallelMng.
void _wait(Parallel::eWaitType wait_type) override
Effectue l'attente ou le test.
Gestionnaire du parallélisme utilisant les threads.
IParallelMng * worldParallelMng() const override
Gestionnaire de parallélisme sur l'ensemble des ressources allouées.
void freeRequests(ArrayView< Request > requests) override
Libère les requêtes.
void printStats() override
Affiche des statistiques liées à ce gestionnaire du parallélisme.
IParallelExchanger * createExchanger() override
Retourne une interface pour transférer des messages entre processeurs.
void barrier() override
Effectue une barière.
MP::Communicator communicator() const override
Communicateur MPI associé à ce gestionnaire.
bool m_is_initialized
true si déjà initialisé
ISerializeMessage * createReceiveSerializer(Int32 rank) override
Créé un message non bloquant pour recevoir des données sérialisées du rang rank.
Ref< Parallel::IRequestList > createRequestListRef() override
Créé une liste de requêtes pour ce gestionnaire.
ITimerMng * timerMng() const override
Gestionnaire de timers.
void build() override
Construit l'instance.
void waitAllRequests(ArrayView< Request > requests) override
Bloque en attendant que les requêtes rvalues soient terminées.
void initialize() override
Initialise le gestionnaire du parallélisme.
IThreadMng * threadMng() const override
Gestionnaire de threads.
Int32 m_global_nb_rank
Nombre de rangs globaux.
MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
Int32 m_local_rank
Rang local du processeur actuel.
ITraceMng * traceMng() const override
Gestionnaire de traces.
IGetVariablesValuesParallelOperation * createGetVariablesValuesOperation() override
Retourne une opération pour récupérer les valeurs d'une variable sur les entités d'un autre sous-doma...
void * getMPICommunicator() override
Adresse du communicateur MPI associé à ce gestionnaire.
IParallelReplication * replication() const override
Informations sur la réplication.
Ref< IParallelMngUtilsFactory > _internalUtilsFactory() const override
Fabrique des fonctions utilitaires.
Ref< IParallelMng > createSubParallelMngRef(Int32ConstArrayView kept_ranks) override
Créé un nouveau gestionnaire de parallélisme pour un sous-ensemble des rangs.
ITransferValuesParallelOperation * createTransferValuesOperation() override
Retourne une opération pour transférer des valeurs entre sous-domaine.
IParallelTopology * createTopology() override
Créé une instance contenant les infos sur la topologie des rangs de ce gestionnnaire.
void setReplication(IParallelReplication *v) override
Positionne les Informations sur la réplication.
IParallelMng * sequentialParallelMng() override
Retourne un gestionnaire de parallélisme séquentiel.
IVariableSynchronizer * createSynchronizer(IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
Int32 m_local_nb_rank
Nombre de rang locaux.
MessageId probe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
ISerializeMessage * createSendSerializer(Int32 rank) override
Créé un message non bloquant pour envoyer des données sérialisées au rang rank.
Int32 commRank() const override
Rang de cette instance dans le communicateur.
Int32 m_global_rank
Numéro du processeur actuel.
PointToPointMessageInfo buildMessage(Int32 dest, MP::eBlockingType is_blocking)
Construit un message avec pour destinataire dest.
void processPendingMessages() override
Envoie les messages de la liste qui ne l'ont pas encore été.
Integer waitMessages(Parallel::eWaitType wait_type) override
Attend que les messages aient terminé leur exécution.
void addMessage(ISerializeMessage *msg) override
Ajoute un message à la liste.
Interface du gestionnaire des échanges de messages.
Interface d'un message de sérialisation entre IMessagePassingMng.
Informations sur la source d'un message.
Informations pour envoyer/recevoir un message point à point.
void setEmiterRank(MessageRank rank)
Positionne le rang de l'émetteur du message.
Informations des buffers de réception.
Requête d'un message.
Definition Request.h:77
Classe de base d'une liste de requêtes.
IParallelMngInternal * _internalApi() override
API interne à Arcane.
Exception lorsqu'une fonction n'est pas implémentée.
Redirige la gestion des messages des sous-domaines suivant le type de l'argument.
IMessagePassingMng * messagePassingMng() const override
Gestionnaire de message de Arccore associé
ITimeStats * timeStats() const override
Gestionnaire de statistiques associé (peut être nul)
Classe de base d'une fabrique pour les fonctions utilitaires de IParallelMng.
Informations sur la réplication des sous-domaines en parallèle.
Référence à une instance.
InstanceType * get() const
Instance associée ou nullptr si aucune.
Implémentation d'un tampon pour la sérialisation.
Vue d'un tableau d'éléments de type T.
Definition Span.h:513
Gestionnaire de timer.
Definition TimerMng.h:39
Positionne la phase de l'action en cours d'exécution.
Definition Timer.h:128
Positionne une classe de message.
Vecteur 1D de données avec sémantique par valeur (style STL).
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
C void mpBroadcast(IMessagePassingMng *pm, Span< char > send_buf, Int32 rank)
@ WaitSome
Attend que tous les messages de la liste soient traités.
eBlockingType
Type indiquant si un message est bloquant ou non.
Implémentation de la concurrence.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
ArrayView< Int64 > Int64ArrayView
Equivalent C d'un tableau à une dimension d'entiers 64 bits.
Definition UtilsTypes.h:538
Ref< TrueType > createRef(Args &&... args)
Créé une instance de type TrueType avec les arguments Args et retourne une référence dessus.
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
Array< Byte > ByteArray
Tableau dynamique à une dimension de caractères.
Definition UtilsTypes.h:208
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:569
std::int16_t Int16
Type entier signé sur 16 bits.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int32_t Int32
Type entier signé sur 32 bits.
Infos pour construire un HybridParallelMng.
Infos pour construire un SequentialParallelMng.