Arcane  v3.16.7.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
HybridParallelMng.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* HybridParallelMng.cc (C) 2000-2025 */
9/* */
10/* Gestionnaire de parallélisme utilisant un mixte MPI/Threads. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/parallel/mpithread/HybridParallelMng.h"
15
16#include "arcane/utils/NotImplementedException.h"
17#include "arcane/utils/NotSupportedException.h"
18#include "arcane/utils/FatalErrorException.h"
19#include "arcane/utils/NumericTypes.h"
20#include "arcane/utils/ArgumentException.h"
21#include "arcane/utils/IThreadBarrier.h"
22#include "arcane/utils/ITraceMng.h"
23
24#include "arcane/core/IIOMng.h"
25#include "arcane/core/Timer.h"
26#include "arcane/core/ISerializeMessageList.h"
27#include "arcane/core/IItemFamily.h"
28#include "arcane/core/internal/ParallelMngInternal.h"
29#include "arcane/core/internal/SerializeMessage.h"
30#include "arcane/core/parallel/IStat.h"
31
32#include "arcane/parallel/mpithread/HybridParallelDispatch.h"
33#include "arcane/parallel/mpithread/HybridMessageQueue.h"
34#include "arcane/parallel/mpithread/internal/HybridMachineMemoryWindowBaseInternalCreator.h"
35#include "arcane/parallel/mpithread/internal/HybridMachineMemoryWindowBaseInternal.h"
36
37#include "arcane/parallel/mpi/MpiParallelMng.h"
38
39#include "arcane/impl/TimerMng.h"
40#include "arcane/impl/ParallelReplication.h"
41#include "arcane/impl/SequentialParallelMng.h"
42#include "arcane/impl/internal/ParallelMngUtilsFactoryBase.h"
43
45#include "arccore/message_passing/RequestListBase.h"
46#include "arccore/message_passing/SerializeMessageList.h"
47
48/*---------------------------------------------------------------------------*/
49/*---------------------------------------------------------------------------*/
50
51namespace Arcane
52{
53extern "C++" IIOMng*
54arcaneCreateIOMng(IParallelMng* psm);
55}
56
58{
59
60/*---------------------------------------------------------------------------*/
61/*---------------------------------------------------------------------------*/
62
63// NOTE: Cette classe n'est plus utilisée. Elle reste pour référence
64// et sera supprimée ultérieurement
65class HybridSerializeMessageList
67{
68 public:
69 class HybridSerializeMessageRequest
70 {
71 public:
72 HybridSerializeMessageRequest(ISerializeMessage* message,Request request)
73 : m_message(message), m_request(request){}
74 public:
75 ISerializeMessage* m_message = nullptr;
76 Request m_request;
77 };
78
79 public:
80
81 explicit HybridSerializeMessageList(HybridParallelMng* mpm)
82 : m_parallel_mng(mpm), m_trace(mpm->traceMng())
83 {
84 }
85
86 public:
87
88 void addMessage(ISerializeMessage* msg) override
89 {
90 m_messages_to_process.add(msg);
91
92 }
93 void processPendingMessages() override
94 {
95 }
96
98 {
99 switch(wait_type){
100 case Parallel::WaitAll:
101 // Pour l'instant seul le mode bloquant est supporté.
102 //m_parallel_mng->processMessages(m_messages_to_process);
103 _wait(Parallel::WaitAll);
104 m_messages_to_process.clear();
105 return (-1);
109 ARCANE_THROW(NotImplementedException,"WaitSomeNonBlocking");
110 }
111 return (-1);
112 }
113
114 private:
115
116 HybridParallelMng* m_parallel_mng;
117 ITraceMng* m_trace;
118 UniqueArray<ISerializeMessage*> m_messages_to_process;
119
120 void _waitAll();
121 void _wait(Parallel::eWaitType wait_mode);
122};
123
124/*---------------------------------------------------------------------------*/
125/*---------------------------------------------------------------------------*/
126
127void HybridSerializeMessageList::
128_wait(Parallel::eWaitType wait_mode)
129{
130 m_trace->info() << "BEGIN PROCESS MESSAGES";
131
132 // TODO: gérer la memoire sans faire de new.
133 ConstArrayView<ISerializeMessage*> messages = m_messages_to_process;
134 HybridMessageQueue* message_queue = m_parallel_mng->m_message_queue;
135 UniqueArray<Request> all_requests;
136 MessageTag HYBRID_MESSAGE_TAG(511);
137 for( ISerializeMessage* sm : messages ){
138 ISerializer* s = sm->serializer();
139 MessageRank orig(sm->source());
140 MessageRank dest(sm->destination());
141 PointToPointMessageInfo message_info(orig,dest,HYBRID_MESSAGE_TAG,Parallel::NonBlocking);
142 Request r;
143 if (sm->isSend())
144 r = message_queue->addSend(message_info,SendBufferInfo(s));
145 else
146 r = message_queue->addReceive(message_info,ReceiveBufferInfo(s));
147 all_requests.add(r);
148 }
149
150 if (wait_mode==Parallel::WaitAll)
151 message_queue->waitAll(all_requests);
152
153 for( ISerializeMessage* sm : messages )
154 sm->setFinished(true);
155}
156
157/*---------------------------------------------------------------------------*/
158/*---------------------------------------------------------------------------*/
159
160/*---------------------------------------------------------------------------*/
161/*---------------------------------------------------------------------------*/
162
164: public ParallelMngInternal
165{
166 public:
167
168 explicit Impl(HybridParallelMng* pm, HybridMachineMemoryWindowBaseInternalCreator* window_creator)
169 : ParallelMngInternal(pm)
170 , m_parallel_mng(pm)
171 , m_window_creator(window_creator)
172 {}
173
174 ~Impl() override = default;
175
176 public:
177
179 {
180 return makeRef(m_window_creator->createWindow(m_parallel_mng->commRank(), sizeof_segment, sizeof_type, m_parallel_mng->mpiParallelMng()));
181 }
182
183 private:
184
185 HybridParallelMng* m_parallel_mng;
187};
188
189/*---------------------------------------------------------------------------*/
190/*---------------------------------------------------------------------------*/
191
192/*---------------------------------------------------------------------------*/
193/*---------------------------------------------------------------------------*/
194
195HybridParallelMng::
196HybridParallelMng(const HybridParallelMngBuildInfo& bi)
197: ParallelMngDispatcher(ParallelMngDispatcherBuildInfo(bi.local_rank,bi.local_nb_rank))
198, m_trace(bi.trace_mng)
199, m_thread_mng(bi.thread_mng)
200, m_world_parallel_mng(bi.world_parallel_mng)
201, m_io_mng(nullptr)
202, m_timer_mng(nullptr)
203, m_replication(new ParallelReplication())
204, m_message_queue(new HybridMessageQueue(bi.message_queue,bi.mpi_parallel_mng,bi.local_nb_rank))
205, m_is_initialized(false)
206, m_stat(Parallel::createDefaultStat())
207, m_thread_barrier(bi.thread_barrier)
208, m_mpi_parallel_mng(bi.mpi_parallel_mng)
209, m_all_dispatchers(bi.all_dispatchers)
210, m_sub_builder_factory(bi.sub_builder_factory)
211, m_parent_container_ref(bi.container)
212, m_utils_factory(createRef<ParallelMngUtilsFactoryBase>())
213, m_parallel_mng_internal(new Impl(this, bi.window_creator))
214{
215 if (!m_world_parallel_mng)
216 m_world_parallel_mng = this;
217
218 // TODO: vérifier que tous les autres HybridParallelMng ont bien
219 // le même nombre de rang locaux (m_local_nb_rank)
220 m_local_rank = bi.local_rank;
221 m_local_nb_rank = bi.local_nb_rank;
222
223 Int32 mpi_rank = m_mpi_parallel_mng->commRank();
224 Int32 mpi_size = m_mpi_parallel_mng->commSize();
225
228
229 m_is_parallel = m_global_nb_rank!=1;
230}
231
232/*---------------------------------------------------------------------------*/
233/*---------------------------------------------------------------------------*/
234
235HybridParallelMng::
236~HybridParallelMng()
237{
238 m_sequential_parallel_mng.reset();
239 delete m_replication;
240 delete m_io_mng;
241 delete m_message_queue;
242 delete m_timer_mng;
243 delete m_stat;
244 delete m_mpi_parallel_mng;
245 delete m_parallel_mng_internal;
246}
247
248/*---------------------------------------------------------------------------*/
249/*---------------------------------------------------------------------------*/
250
251namespace
252{
253// Classe pour créer les différents dispatchers
254class DispatchCreator
255{
256 public:
257 DispatchCreator(ITraceMng* tm,HybridParallelMng* mpm,HybridMessageQueue* message_queue,MpiThreadAllDispatcher* all_dispatchers)
258 : m_tm(tm), m_mpm(mpm), m_message_queue(message_queue), m_all_dispatchers(all_dispatchers){}
259 public:
260 template<typename DataType> HybridParallelDispatch<DataType>*
261 create()
262 {
263 HybridMessageQueue* tmq = m_message_queue;
264 MpiThreadAllDispatcher* ad = m_all_dispatchers;
265 auto field = ad->instance((DataType*)nullptr).view();
266 return new HybridParallelDispatch<DataType>(m_tm,m_mpm,tmq,field);
267 }
268
269 ITraceMng* m_tm;
270 HybridParallelMng* m_mpm;
271 HybridMessageQueue* m_message_queue;
272 MpiThreadAllDispatcher* m_all_dispatchers;
273};
274}
275
276/*---------------------------------------------------------------------------*/
277/*---------------------------------------------------------------------------*/
278
280build()
281{
282 ITraceMng* tm = traceMng();
283 tm->info() << "Initialise HybridParallelMng"
284 << " global_rank=" << m_global_rank
285 << " local_rank=" << m_local_rank
286 << " mpi_rank=" << m_mpi_parallel_mng->commRank();
287
288 m_timer_mng = new TimerMng(tm);
289
290 // Créé le gestionnaire séquentiel associé.
291 {
293 bi.setTraceMng(traceMng());
294 bi.setCommunicator(communicator());
295 bi.setThreadMng(threadMng());
296 m_sequential_parallel_mng = arcaneCreateSequentialParallelMngRef(bi);
297 }
298
299 DispatchCreator creator(m_trace,this,m_message_queue,m_all_dispatchers);
300 this->createDispatchers(creator);
301 m_io_mng = arcaneCreateIOMng(this);
302}
303
304/*---------------------------------------------------------------------------*/
305/*---------------------------------------------------------------------------*/
306
307/*----------------------------------------------------------------------------*/
308/*---------------------------------------------------------------------------*/
309
312{
313 Trace::Setter mci(m_trace,"Thread");
314 if (m_is_initialized){
315 m_trace->warning() << "HybridParallelMng already initialized";
316 return;
317 }
318
319 m_is_initialized = true;
320}
321
322/*---------------------------------------------------------------------------*/
323/*---------------------------------------------------------------------------*/
324
325SerializeBuffer* HybridParallelMng::
326_castSerializer(ISerializer* serializer)
327{
328 auto sbuf = dynamic_cast<SerializeBuffer*>(serializer);
329 if (!sbuf)
330 ARCANE_THROW(ArgumentException,"can not cast 'ISerializer' to 'SerializeBuffer'");
331 return sbuf;
332}
333
334/*---------------------------------------------------------------------------*/
335/*---------------------------------------------------------------------------*/
336
339{
340 return m_utils_factory->createGetVariablesValuesOperation(this)._release();
341}
342
345{
346 return m_utils_factory->createTransferValuesOperation(this)._release();
347}
348
351{
352 return m_utils_factory->createExchanger(this)._release();
353}
354
355/*---------------------------------------------------------------------------*/
356/*---------------------------------------------------------------------------*/
357
358/*---------------------------------------------------------------------------*/
359/*---------------------------------------------------------------------------*/
360
361void HybridParallelMng::
362sendSerializer(ISerializer* s,Int32 rank)
363{
364 auto p2p_message = buildMessage(rank,Parallel::NonBlocking);
365 Request r = m_message_queue->addSend(p2p_message,s);
366 m_message_queue->waitAll(ArrayView<Request>(1,&r));
367}
368
369/*---------------------------------------------------------------------------*/
370/*---------------------------------------------------------------------------*/
371
372auto HybridParallelMng::
373sendSerializer(ISerializer* s,Int32 rank,ByteArray& bytes) -> Request
374{
375 ARCANE_UNUSED(bytes);
376 auto p2p_message = buildMessage(rank,Parallel::NonBlocking);
377 return m_message_queue->addSend(p2p_message,s);
378}
379
380/*---------------------------------------------------------------------------*/
381/*---------------------------------------------------------------------------*/
382
385{
386 return m_utils_factory->createSendSerializeMessage(this, rank)._release();
387}
388
389/*---------------------------------------------------------------------------*/
390/*---------------------------------------------------------------------------*/
391
392void HybridParallelMng::
393broadcastSerializer(ISerializer* values,Int32 rank)
394{
395 Timer::Phase tphase(timeStats(),TP_Communication);
396 SerializeBuffer* sbuf = _castSerializer(values);
397
398 bool is_broadcaster = (rank==commRank());
399
400 // Effectue l'envoie en deux phases. Envoie d'abord le nombre d'éléments
401 // puis envoie les éléments.
402 // TODO: il serait possible de le faire en une fois pour les messages
403 // ne dépassant pas une certaine taille.
404
406 if (is_broadcaster){
407 Int64 total_size = sbuf->totalSize();
408 Span<Byte> bytes = sbuf->globalBuffer();
409 this->broadcast(Int64ArrayView(1,&total_size),rank);
410 mpBroadcast(mpm,bytes,rank);
411 }
412 else{
413 Int64 total_size = 0;
414 this->broadcast(Int64ArrayView(1,&total_size),rank);
415 sbuf->preallocate(total_size);
416 Span<Byte> bytes = sbuf->globalBuffer();
417 mpBroadcast(mpm,bytes,rank);
418 sbuf->setFromSizes();
419 }
420}
421
422/*---------------------------------------------------------------------------*/
423/*---------------------------------------------------------------------------*/
424
425void HybridParallelMng::
426recvSerializer(ISerializer* s,Int32 rank)
427{
428 auto p2p_message = buildMessage(rank,Parallel::NonBlocking);
429 Request r = m_message_queue->addReceive(p2p_message,ReceiveBufferInfo(s));
430 m_message_queue->waitAll(ArrayView<Request>(1,&r));
431}
432
433/*---------------------------------------------------------------------------*/
434/*---------------------------------------------------------------------------*/
435
438{
439 return m_utils_factory->createReceiveSerializeMessage(this, rank)._release();
440}
441
442/*---------------------------------------------------------------------------*/
443/*---------------------------------------------------------------------------*/
444
447{
448 ARCANE_UNUSED(requests);
449 throw NotImplementedException(A_FUNCINFO);
450}
451
452/*---------------------------------------------------------------------------*/
453/*---------------------------------------------------------------------------*/
454
456probe(const PointToPointMessageInfo& message)
457{
458 PointToPointMessageInfo p2p_message(message);
460 return m_message_queue->probe(p2p_message);
461}
462
463/*---------------------------------------------------------------------------*/
464/*---------------------------------------------------------------------------*/
465
468{
469 PointToPointMessageInfo p2p_message(message);
471 return m_message_queue->legacyProbe(p2p_message);
472}
473
474/*---------------------------------------------------------------------------*/
475/*---------------------------------------------------------------------------*/
476
477Request HybridParallelMng::
478sendSerializer(const ISerializer* s,const PointToPointMessageInfo& message)
479{
480 auto p2p_message = buildMessage(message);
481 return m_message_queue->addSend(p2p_message,s);
482}
483
484/*---------------------------------------------------------------------------*/
485/*---------------------------------------------------------------------------*/
486
487Request HybridParallelMng::
488receiveSerializer(ISerializer* s,const PointToPointMessageInfo& message)
489{
490 auto p2p_message = buildMessage(message);
491 return m_message_queue->addReceive(p2p_message,ReceiveBufferInfo(s));
492}
493
494/*---------------------------------------------------------------------------*/
495/*---------------------------------------------------------------------------*/
496
499{
500 if (m_stat)
501 m_stat->print(m_trace);
502}
503
504/*---------------------------------------------------------------------------*/
505/*---------------------------------------------------------------------------*/
506
508barrier()
509{
510 m_thread_barrier->wait();
511 if (m_local_rank==0)
512 m_mpi_parallel_mng->barrier();
513 m_thread_barrier->wait();
514}
515
516/*---------------------------------------------------------------------------*/
517/*---------------------------------------------------------------------------*/
518
519ISerializeMessageList* HybridParallelMng::
520_createSerializeMessageList()
521{
522 auto* x = new MP::internal::SerializeMessageList(messagePassingMng());
523 x->setAllowAnyRankReceive(false);
524 return x;
525}
526
527/*---------------------------------------------------------------------------*/
528/*---------------------------------------------------------------------------*/
529
532{
533 return m_utils_factory->createSynchronizer(this,family)._release();
534}
535
536/*---------------------------------------------------------------------------*/
537/*---------------------------------------------------------------------------*/
538
540createSynchronizer(const ItemGroup& group)
541{
542 return m_utils_factory->createSynchronizer(this,group)._release();
543}
544
545/*---------------------------------------------------------------------------*/
546/*---------------------------------------------------------------------------*/
547
550{
551 return m_utils_factory->createTopology(this)._release();
552}
553
554/*---------------------------------------------------------------------------*/
555/*---------------------------------------------------------------------------*/
556
558replication() const
559{
560 return m_replication;
561}
562
563/*---------------------------------------------------------------------------*/
564/*---------------------------------------------------------------------------*/
565
568{
569 delete m_replication;
570 m_replication = v;
571}
572
573/*---------------------------------------------------------------------------*/
574/*---------------------------------------------------------------------------*/
575
578{
579 return m_sequential_parallel_mng.get();
580}
581
582/*---------------------------------------------------------------------------*/
583/*---------------------------------------------------------------------------*/
584
585Ref<IParallelMng> HybridParallelMng::
586sequentialParallelMngRef()
587{
588 return m_sequential_parallel_mng;
589}
590
591/*---------------------------------------------------------------------------*/
592/*---------------------------------------------------------------------------*/
598{
600 public:
601 RequestList(HybridParallelMng* pm)
602 : m_parallel_mng(pm), m_message_queue(pm->m_message_queue),
603 m_local_rank(m_parallel_mng->localRank()) {}
604 public:
605 void _wait(Parallel::eWaitType wait_type) override
606 {
607 switch(wait_type){
608 case Parallel::WaitAll:
609 m_parallel_mng->m_message_queue->waitAll(_requests());
610 break;
612 m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),false);
613 break;
615 m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),true);
616 }
617 }
618 private:
619 HybridParallelMng* m_parallel_mng;
620 HybridMessageQueue* m_message_queue;
622};
623
624/*---------------------------------------------------------------------------*/
625/*---------------------------------------------------------------------------*/
626
633
634/*---------------------------------------------------------------------------*/
635/*---------------------------------------------------------------------------*/
636
639{
640 m_message_queue->waitAll(requests);
641}
642
643/*---------------------------------------------------------------------------*/
644/*---------------------------------------------------------------------------*/
645
648{
649 return m_mpi_parallel_mng->getMPICommunicator();
650}
651
652/*---------------------------------------------------------------------------*/
653/*---------------------------------------------------------------------------*/
654
655MP::Communicator HybridParallelMng::
656communicator() const
657{
658 return m_mpi_parallel_mng->communicator();
659}
660
661/*---------------------------------------------------------------------------*/
662/*---------------------------------------------------------------------------*/
663
666{
667 PointToPointMessageInfo p2p_message(message);
668 p2p_message.setEmiterRank(MessageRank(m_global_rank));
669 return p2p_message;
670}
671
672/*---------------------------------------------------------------------------*/
673/*---------------------------------------------------------------------------*/
674
677{
678 return buildMessage({MessageRank(dest),blocking_mode});
679}
680
681/*---------------------------------------------------------------------------*/
682/*---------------------------------------------------------------------------*/
683
684IParallelMng* HybridParallelMng::
685_createSubParallelMng(Int32ConstArrayView kept_ranks)
686{
687 ARCANE_UNUSED(kept_ranks);
688 ARCANE_THROW(NotSupportedException,"Use createSubParallelMngRef() instead");
689}
690
691/*---------------------------------------------------------------------------*/
692/*---------------------------------------------------------------------------*/
693
696{
697 // ATTENTION: Cette méthode est appelée simultanément par tous les threads
698 // partageant cet HybridParallelMng.
699
700 if (kept_ranks.empty())
701 ARCANE_FATAL("kept_ranks is empty");
702 ARCANE_CHECK_POINTER(m_sub_builder_factory);
703
704 m_trace->info() << "CREATE SUB_PARALLEL_MNG_REF";
705
706 /*
707 Il existe plusieurs possibilités:
708 1. on réduit juste le nombre de rangs en mémoire partagé pour chaque
709 processus MPI -< on créé un HybridParallelMng
710 2. On ne garde que le rang maitre de chaque processus MPI -> on créé un MpiParallelMng.
711 3. On ne garde que les rangs d'un même processus -> on créé un SharedMemoryParallelMng
712 4. On ne garde qu'un seul rang: on crée un MpiSequentialParallelMng.
713 */
714 // Pour l'instant, on ne supporte que le cas 1 et 2.
715 Int32 nb_kept_rank = kept_ranks.size();
716
717 // Détermine le nouveau nombre de rangs locaux par rang MPI.
718
719 // Regarde si je suis dans les listes des rangs conservés et si oui
720 // détermine mon rang dans le IParallelMng créé
721 Int32 first_global_rank_in_this_mpi = m_global_rank - m_local_rank;
722 Int32 last_global_rank_in_this_mpi = first_global_rank_in_this_mpi + m_local_nb_rank - 1;
723 // Mon nouveau rang local. Négatif si je ne suis pas dans le nouveau communicateur
724 Int32 my_new_global_rank = (-1);
725 Int32 new_local_nb_rank = 0;
726 Int32 my_new_local_rank = (-1);
727 for( Integer i=0; i<nb_kept_rank; ++i ){
728 Int32 kept_rank = kept_ranks[i];
729 if (kept_rank>=first_global_rank_in_this_mpi && kept_rank<last_global_rank_in_this_mpi)
730 ++new_local_nb_rank;
731 if (kept_rank==m_global_rank){
732 my_new_global_rank = i;
733 my_new_local_rank = new_local_nb_rank - 1;
734 }
735 }
736 bool has_new_rank = (my_new_global_rank != (-1));
737
738 // Calcule le min, le max et la somme sur tous les rangs du nombre de nouveaux.
739 // Deux cas peuvent se présenter:
740 // 1. Le min et le max sont égaux et supérieurs ou égaux à 2: Dans ce cas on créé
741 // un HybridParallelMng.
742 // 2. Le max vaut 1. Dans ce cas on créé un nouveau IParallelMng via le MpiParallelMng.
743 // Les rangs actuels pour lequels 'new_local_nb_rank' vaut 0 ne seront pas dans ce
744 // nouveau communicateur. Ce cas concerne aussi le cas où il ne reste plus qu'un
745 // seul rang à la fin.
746
747 Int32 min_new_local_nb_rank = -1;
748 Int32 max_new_local_nb_rank = -1;
749 Int32 sum_new_local_nb_rank = -1;
750 Int32 min_rank = A_NULL_RANK;
751 Int32 max_rank = A_NULL_RANK;
752 computeMinMaxSum(new_local_nb_rank,min_new_local_nb_rank,max_new_local_nb_rank,
753 sum_new_local_nb_rank,min_rank,max_rank);
754
755 m_trace->info() << "CREATE SUB_PARALLEL_MNG_REF new_local_nb_rank=" << new_local_nb_rank
756 << " min=" << min_new_local_nb_rank
757 << " max=" << max_new_local_nb_rank
758 << " sum=" << sum_new_local_nb_rank
759 << " new_global_rank=" << my_new_global_rank;
760
761 // S'il ne reste qu'un seul rang local, alors on construit uniquement un MpiParallelMng.
762 // Seul le PE qui a un nouveau rang est concerné et fait cela
763 if (max_new_local_nb_rank==1){
764 Integer nb_mpi_rank = m_mpi_parallel_mng->commSize();
765 // Il faut calculer les nouveaux rangs MPI.
766 // Si 'min_new_local_nb_rank' vaut 1, alors c'est simple car cela signifie qu'on garde
767 // tous les rangs MPI actuels (on fait l'équivalent d'un MPI_Comm_dup). Sinon, on
768 // récupère pour chaque rang MPI s'il sera dans le nouveau communicateur et on construit
769 // la liste des rangs conservés en fonction de cela.
770 // NOTE: dans tous les cas il faut faire attention qu'un seul thread utilise le
771 // 'm_mpi_parallel_mng'.
772 UniqueArray<Int32> kept_mpi_ranks;
774 bool do_mpi_call = false;
775 if (min_new_local_nb_rank==1){
776 if (has_new_rank){
777 do_mpi_call = true;
778 kept_mpi_ranks.resize(nb_mpi_rank);
779 for( Int32 x=0; x<nb_mpi_rank; ++x )
780 kept_mpi_ranks[x] = x;
781 }
782 }
783 else{
784 // Si je ne suis pas dans le nouveau communicateur, c'est le rang local 0 qui
785 // faut le 'gather'.
786 UniqueArray<Int16> gathered_ranks(nb_mpi_rank);
787 if (has_new_rank || m_local_rank==0){
788 do_mpi_call = true;
789 Int16 v = (has_new_rank) ? 1 : 0;
790 m_mpi_parallel_mng->allGather(ArrayView<Int16>(1,&v),gathered_ranks);
791 }
792 for( Int32 x=0; x<nb_mpi_rank; ++x )
793 if (gathered_ranks[x]==1)
794 kept_mpi_ranks.add(x);
795 }
796 if (do_mpi_call)
797 return m_mpi_parallel_mng->createSubParallelMngRef(kept_mpi_ranks);
798 else
799 return Ref<IParallelMng>();
800 }
801
802 if (max_new_local_nb_rank!=new_local_nb_rank)
803 ARCANE_FATAL("Not same number of new local ranks on every MPI processus: current={0} max={1}",
804 new_local_nb_rank,max_new_local_nb_rank);
805
806 if (max_new_local_nb_rank<2)
807 ARCANE_FATAL("number of local ranks is too low current={0} minimum=2",new_local_nb_rank);
808
809 // Met une barrière locale pour être sur que tout le monde attend ici.
810 m_thread_barrier->wait();
811
812 // NOTE: Le builder contient les parties communes aux IParallelMng créés. Il faut
813 // donc que ces derniers gardent une référence dessus sinon il sera détruit à la fin
814 // de cette méthode.
816
817 // Le rang 0 créé le builder
818 if (m_local_rank==0){
819 // Suppose qu'on à le même nombre de rangs MPI qu'avant donc on utilise
820 // le communicateur MPI qu'on a déjà.
821 MP::Communicator c = communicator();
822 builder = m_sub_builder_factory->_createParallelMngBuilder(new_local_nb_rank,c);
823 // Positionne le builder pour tout le monde
824 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder = builder;
825 }
826 // Attend pour être sur que tous les threads voit le bon builder.
827 m_thread_barrier->wait();
828
829 builder = m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder;
830 ARCANE_CHECK_POINTER(builder.get());
831
832 Ref<IParallelMng> new_parallel_mng;
833 if (my_new_local_rank>=0){
834 new_parallel_mng = builder->_createParallelMng(my_new_local_rank,traceMng());
835 }
836 m_thread_barrier->wait();
837
838 // Ici, tout le monde a créé son IParallelMng. On peut donc
839 // supprimer la référence au builder. Les IParallelMng créés gardent
840 // une référence au builder
841 if (m_local_rank==0){
842 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder.reset();
843 }
844 m_thread_barrier->wait();
845
846 return new_parallel_mng;
847}
848
849/*---------------------------------------------------------------------------*/
850/*---------------------------------------------------------------------------*/
851
854{
855 return m_utils_factory;
856}
857
858/*---------------------------------------------------------------------------*/
859/*---------------------------------------------------------------------------*/
860
861bool HybridParallelMng::
862_isAcceleratorAware() const
863{
864 return m_mpi_parallel_mng->_internalApi()->isAcceleratorAware();
865}
866
867/*---------------------------------------------------------------------------*/
868/*---------------------------------------------------------------------------*/
869
870} // End namespace Arcane::MessagePassing
871
872/*---------------------------------------------------------------------------*/
873/*---------------------------------------------------------------------------*/
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Liste des fonctions d'échange de message.
Exception lorsqu'un argument est invalide.
Vue modifiable d'un tableau d'un type T.
void resize(Int64 s)
Change le nombre d'éléments du tableau à s.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
Vue constante d'un tableau de type T.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
constexpr bool empty() const noexcept
true si le tableau est vide (size()==0)
Opérations pour accéder aux valeurs de variables d'un autre sous-domaine.
Interface du gestionnaire des entrées sorties.
Definition IIOMng.h:36
Interface d'une famille d'entités.
Definition IItemFamily.h:84
Échange d'informations entre processeurs.
virtual bool isAcceleratorAware() const =0
Indique si l'implémentation gère les accélérateurs.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual void computeMinMaxSum(char val, char &min_val, char &max_val, char &sum_val, Int32 &min_rank, Int32 &max_rank)=0
Calcule en une opération la somme, le min, le max d'une valeur.
Informations sur la réplication des sous-domaines en parallèle.
Informations sur la topologie d'allocation des coeurs de calcul.
Interface du gestionnaire de traces.
virtual TraceMessage info()=0
Flot pour un message d'information.
Envoie de valeurs sur différents processeurs.
Interface d'un service de synchronisation de variable.
Groupe d'entités de maillage.
Definition ItemGroup.h:49
Interface d'une file de messages avec les threads.
Ref< IMachineMemoryWindowBaseInternal > createMachineMemoryWindowBase(Int64 sizeof_segment, Int32 sizeof_type) override
Méthode permettant de créer une fenêtre mémoire sur le noeud.
Implémentation de IRequestList pour HybridParallelMng.
void _wait(Parallel::eWaitType wait_type) override
Effectue l'attente ou le test.
Gestionnaire du parallélisme utilisant les threads.
IParallelMng * worldParallelMng() const override
Gestionnaire de parallélisme sur l'ensemble des ressources allouées.
void freeRequests(ArrayView< Request > requests) override
Libère les requêtes.
void printStats() override
Affiche des statistiques liées à ce gestionnaire du parallélisme.
IParallelExchanger * createExchanger() override
Retourne une interface pour transférer des messages entre processeurs.
void barrier() override
Effectue une barière.
MP::Communicator communicator() const override
Communicateur MPI associé à ce gestionnaire.
bool m_is_initialized
true si déjà initialisé
ISerializeMessage * createReceiveSerializer(Int32 rank) override
Créé un message non bloquant pour recevoir des données sérialisées du rang rank.
Ref< Parallel::IRequestList > createRequestListRef() override
Créé une liste de requêtes pour ce gestionnaire.
ITimerMng * timerMng() const override
Gestionnaire de timers.
void build() override
Construit l'instance.
void waitAllRequests(ArrayView< Request > requests) override
Bloque en attendant que les requêtes rvalues soient terminées.
void initialize() override
Initialise le gestionnaire du parallélisme.
IThreadMng * threadMng() const override
Gestionnaire de threads.
Int32 m_global_nb_rank
Nombre de rangs globaux.
MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
Int32 m_local_rank
Rang local du processeur actuel.
ITraceMng * traceMng() const override
Gestionnaire de traces.
IGetVariablesValuesParallelOperation * createGetVariablesValuesOperation() override
Retourne une opération pour récupérer les valeurs d'une variable sur les entités d'un autre sous-doma...
void * getMPICommunicator() override
Adresse du communicateur MPI associé à ce gestionnaire.
IParallelReplication * replication() const override
Informations sur la réplication.
Ref< IParallelMngUtilsFactory > _internalUtilsFactory() const override
Fabrique des fonctions utilitaires.
Ref< IParallelMng > createSubParallelMngRef(Int32ConstArrayView kept_ranks) override
Créé un nouveau gestionnaire de parallélisme pour un sous-ensemble des rangs.
ITransferValuesParallelOperation * createTransferValuesOperation() override
Retourne une opération pour transférer des valeurs entre sous-domaine.
IParallelTopology * createTopology() override
Créé une instance contenant les infos sur la topologie des rangs de ce gestionnnaire.
void setReplication(IParallelReplication *v) override
Positionne les Informations sur la réplication.
IParallelMng * sequentialParallelMng() override
Retourne un gestionnaire de parallélisme séquentiel.
IVariableSynchronizer * createSynchronizer(IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
Int32 m_local_nb_rank
Nombre de rang locaux.
MessageId probe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
ISerializeMessage * createSendSerializer(Int32 rank) override
Créé un message non bloquant pour envoyer des données sérialisées au rang rank.
Int32 commRank() const override
Rang de cette instance dans le communicateur.
Int32 m_global_rank
Numéro du processeur actuel.
PointToPointMessageInfo buildMessage(Int32 dest, MP::eBlockingType is_blocking)
Construit un message avec pour destinataire dest.
void processPendingMessages() override
Envoie les messages de la liste qui ne l'ont pas encore été.
Integer waitMessages(Parallel::eWaitType wait_type) override
Attend que les messages aient terminé leur exécution.
void addMessage(ISerializeMessage *msg) override
Ajoute un message à la liste.
Interface du gestionnaire des échanges de messages.
Interface d'un message de sérialisation entre IMessagePassingMng.
Informations sur la source d'un message.
Informations pour envoyer/recevoir un message point à point.
void setEmiterRank(MessageRank rank)
Positionne le rang de l'émetteur du message.
Informations des buffers de réception.
Requête d'un message.
Definition Request.h:77
Classe de base d'une liste de requêtes.
IParallelMngInternal * _internalApi() override
API interne à Arcane.
Exception lorsqu'une fonction n'est pas implémentée.
Redirige la gestion des messages des sous-domaines suivant le type de l'argument.
IMessagePassingMng * messagePassingMng() const override
Gestionnaire de message de Arccore associé
ITimeStats * timeStats() const override
Gestionnaire de statistiques associé (peut être nul)
Classe de base d'une fabrique pour les fonctions utilitaires de IParallelMng.
Informations sur la réplication des sous-domaines en parallèle.
Référence à une instance.
InstanceType * get() const
Instance associée ou nullptr si aucune.
Implémentation d'un tampon pour la sérialisation.
Vue d'un tableau d'éléments de type T.
Definition Span.h:513
Gestionnaire de timer.
Definition TimerMng.h:39
Positionne la phase de l'action en cours d'exécution.
Definition Timer.h:128
Positionne une classe de message.
Vecteur 1D de données avec sémantique par valeur (style STL).
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
C void mpBroadcast(IMessagePassingMng *pm, Span< char > send_buf, Int32 rank)
@ WaitSome
Attend que tous les messages de la liste soient traités.
eBlockingType
Type indiquant si un message est bloquant ou non.
Implémentation de la concurrence.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
ArrayView< Int64 > Int64ArrayView
Equivalent C d'un tableau à une dimension d'entiers 64 bits.
Definition UtilsTypes.h:538
Ref< TrueType > createRef(Args &&... args)
Créé une instance de type TrueType avec les arguments Args et retourne une référence dessus.
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
Array< Byte > ByteArray
Tableau dynamique à une dimension de caractères.
Definition UtilsTypes.h:208
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:569
std::int16_t Int16
Type entier signé sur 16 bits.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int32_t Int32
Type entier signé sur 32 bits.
Infos pour construire un HybridParallelMng.
Infos pour construire un SequentialParallelMng.