Arcane  v4.1.4.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
HybridParallelMng.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2026 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* HybridParallelMng.cc (C) 2000-2026 */
9/* */
10/* Gestionnaire de parallélisme utilisant un mixte MPI/Threads. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/parallel/mpithread/HybridParallelMng.h"
15
16#include "arcane/utils/NotImplementedException.h"
17#include "arcane/utils/NotSupportedException.h"
18#include "arcane/utils/FatalErrorException.h"
19#include "arcane/utils/NumericTypes.h"
20#include "arcane/utils/ArgumentException.h"
21#include "arcane/utils/IThreadBarrier.h"
22#include "arcane/utils/ITraceMng.h"
23
24#include "arcane/core/IIOMng.h"
25#include "arcane/core/Timer.h"
26#include "arcane/core/ISerializeMessageList.h"
27#include "arcane/core/IItemFamily.h"
28#include "arcane/core/internal/ParallelMngInternal.h"
29#include "arcane/core/internal/SerializeMessage.h"
30#include "arcane/core/internal/DynamicMachineMemoryWindowMemoryAllocator.h"
31#include "arcane/core/parallel/IStat.h"
32
33#include "arcane/parallel/mpithread/HybridParallelDispatch.h"
34#include "arcane/parallel/mpithread/HybridMessageQueue.h"
35#include "arcane/parallel/mpithread/internal/HybridMachineMemoryWindowBaseInternalCreator.h"
36#include "arcane/parallel/mpithread/internal/HybridMachineMemoryWindowBaseInternal.h"
37#include "arcane/parallel/mpithread/internal/HybridDynamicMachineMemoryWindowBaseInternal.h"
38
39#include "arcane/parallel/mpi/MpiParallelMng.h"
40
41#include "arcane/impl/TimerMng.h"
42#include "arcane/impl/ParallelReplication.h"
43#include "arcane/impl/SequentialParallelMng.h"
44#include "arcane/impl/internal/ParallelMngUtilsFactoryBase.h"
45
47#include "arccore/message_passing/RequestListBase.h"
48#include "arccore/message_passing/internal/SerializeMessageList.h"
49
50/*---------------------------------------------------------------------------*/
51/*---------------------------------------------------------------------------*/
52
53namespace Arcane
54{
55extern "C++" IIOMng*
56arcaneCreateIOMng(IParallelMng* psm);
57}
58
60{
61
62/*---------------------------------------------------------------------------*/
63/*---------------------------------------------------------------------------*/
64
65// NOTE: Cette classe n'est plus utilisée. Elle reste pour référence
66// et sera supprimée ultérieurement
67class HybridSerializeMessageList
69{
70 public:
71 class HybridSerializeMessageRequest
72 {
73 public:
74 HybridSerializeMessageRequest(ISerializeMessage* message,Request request)
75 : m_message(message), m_request(request){}
76 public:
77 ISerializeMessage* m_message = nullptr;
78 Request m_request;
79 };
80
81 public:
82
83 explicit HybridSerializeMessageList(HybridParallelMng* mpm)
84 : m_parallel_mng(mpm), m_trace(mpm->traceMng())
85 {
86 }
87
88 public:
89
90 void addMessage(ISerializeMessage* msg) override
91 {
92 m_messages_to_process.add(msg);
93
94 }
95 void processPendingMessages() override
96 {
97 }
98
100 {
101 switch(wait_type){
102 case Parallel::WaitAll:
103 // Pour l'instant seul le mode bloquant est supporté.
104 //m_parallel_mng->processMessages(m_messages_to_process);
105 _wait(Parallel::WaitAll);
106 m_messages_to_process.clear();
107 return (-1);
111 ARCANE_THROW(NotImplementedException,"WaitSomeNonBlocking");
112 }
113 return (-1);
114 }
115
116 private:
117
118 HybridParallelMng* m_parallel_mng;
119 ITraceMng* m_trace;
120 UniqueArray<ISerializeMessage*> m_messages_to_process;
121
122 void _waitAll();
123 void _wait(Parallel::eWaitType wait_mode);
124};
125
126/*---------------------------------------------------------------------------*/
127/*---------------------------------------------------------------------------*/
128
129void HybridSerializeMessageList::
130_wait(Parallel::eWaitType wait_mode)
131{
132 m_trace->info() << "BEGIN PROCESS MESSAGES";
133
134 // TODO: gérer la memoire sans faire de new.
135 ConstArrayView<ISerializeMessage*> messages = m_messages_to_process;
136 HybridMessageQueue* message_queue = m_parallel_mng->m_message_queue;
137 UniqueArray<Request> all_requests;
138 MessageTag HYBRID_MESSAGE_TAG(511);
139 for( ISerializeMessage* sm : messages ){
140 ISerializer* s = sm->serializer();
141 MessageRank orig(sm->source());
142 MessageRank dest(sm->destination());
143 PointToPointMessageInfo message_info(orig,dest,HYBRID_MESSAGE_TAG,Parallel::NonBlocking);
144 Request r;
145 if (sm->isSend())
146 r = message_queue->addSend(message_info,SendBufferInfo(s));
147 else
148 r = message_queue->addReceive(message_info,ReceiveBufferInfo(s));
149 all_requests.add(r);
150 }
151
152 if (wait_mode==Parallel::WaitAll)
153 message_queue->waitAll(all_requests);
154
155 for( ISerializeMessage* sm : messages )
156 sm->setFinished(true);
157}
158
159/*---------------------------------------------------------------------------*/
160/*---------------------------------------------------------------------------*/
161
162/*---------------------------------------------------------------------------*/
163/*---------------------------------------------------------------------------*/
164
166: public ParallelMngInternal
167{
168 public:
169
170 explicit Impl(HybridParallelMng* pm, HybridMachineMemoryWindowBaseInternalCreator* window_creator)
171 : ParallelMngInternal(pm)
172 , m_parallel_mng(pm)
173 , m_window_creator(window_creator)
175 {}
176
177 ~Impl() override = default;
178
179 public:
180
182 {
183 return makeRef(m_window_creator->createWindow(m_parallel_mng->commRank(), sizeof_segment, sizeof_type, m_parallel_mng->mpiParallelMng()));
184 }
185
187 {
188 return makeRef(m_window_creator->createDynamicWindow(m_parallel_mng->commRank(), sizeof_segment, sizeof_type, m_parallel_mng->mpiParallelMng()));
189 }
190
191 IMemoryAllocator* dynamicMachineMemoryWindowMemoryAllocator() override
192 {
193 return m_alloc.get();
194 }
195
196 private:
197
198 HybridParallelMng* m_parallel_mng;
201};
202
203/*---------------------------------------------------------------------------*/
204/*---------------------------------------------------------------------------*/
205
206/*---------------------------------------------------------------------------*/
207/*---------------------------------------------------------------------------*/
208
209HybridParallelMng::
210HybridParallelMng(const HybridParallelMngBuildInfo& bi)
211: ParallelMngDispatcher(ParallelMngDispatcherBuildInfo(bi.local_rank,bi.local_nb_rank))
212, m_trace(bi.trace_mng)
213, m_thread_mng(bi.thread_mng)
214, m_world_parallel_mng(bi.world_parallel_mng)
215, m_io_mng(nullptr)
216, m_timer_mng(nullptr)
217, m_replication(new ParallelReplication())
218, m_message_queue(new HybridMessageQueue(bi.message_queue,bi.mpi_parallel_mng,bi.local_nb_rank))
219, m_is_initialized(false)
220, m_stat(Parallel::createDefaultStat())
221, m_thread_barrier(bi.thread_barrier)
222, m_mpi_parallel_mng(bi.mpi_parallel_mng)
223, m_all_dispatchers(bi.all_dispatchers)
224, m_sub_builder_factory(bi.sub_builder_factory)
225, m_parent_container_ref(bi.container)
226, m_utils_factory(createRef<ParallelMngUtilsFactoryBase>())
227, m_parallel_mng_internal(new Impl(this, bi.window_creator))
228{
229 if (!m_world_parallel_mng)
230 m_world_parallel_mng = this;
231
232 // TODO: vérifier que tous les autres HybridParallelMng ont bien
233 // le même nombre de rang locaux (m_local_nb_rank)
234 m_local_rank = bi.local_rank;
235 m_local_nb_rank = bi.local_nb_rank;
236
237 Int32 mpi_rank = m_mpi_parallel_mng->commRank();
238 Int32 mpi_size = m_mpi_parallel_mng->commSize();
239
242
243 m_is_parallel = m_global_nb_rank!=1;
244}
245
246/*---------------------------------------------------------------------------*/
247/*---------------------------------------------------------------------------*/
248
249HybridParallelMng::
250~HybridParallelMng()
251{
252 m_sequential_parallel_mng.reset();
253 delete m_replication;
254 delete m_io_mng;
255 delete m_message_queue;
256 delete m_timer_mng;
257 delete m_stat;
258 delete m_mpi_parallel_mng;
259 delete m_parallel_mng_internal;
260}
261
262/*---------------------------------------------------------------------------*/
263/*---------------------------------------------------------------------------*/
264
265namespace
266{
267// Classe pour créer les différents dispatchers
268class DispatchCreator
269{
270 public:
271 DispatchCreator(ITraceMng* tm,HybridParallelMng* mpm,HybridMessageQueue* message_queue,MpiThreadAllDispatcher* all_dispatchers)
272 : m_tm(tm), m_mpm(mpm), m_message_queue(message_queue), m_all_dispatchers(all_dispatchers){}
273 public:
274 template<typename DataType> HybridParallelDispatch<DataType>*
275 create()
276 {
277 HybridMessageQueue* tmq = m_message_queue;
278 MpiThreadAllDispatcher* ad = m_all_dispatchers;
279 auto field = ad->instance((DataType*)nullptr).view();
280 return new HybridParallelDispatch<DataType>(m_tm,m_mpm,tmq,field);
281 }
282
283 ITraceMng* m_tm;
284 HybridParallelMng* m_mpm;
285 HybridMessageQueue* m_message_queue;
286 MpiThreadAllDispatcher* m_all_dispatchers;
287};
288}
289
290/*---------------------------------------------------------------------------*/
291/*---------------------------------------------------------------------------*/
292
294build()
295{
296 ITraceMng* tm = traceMng();
297 tm->info() << "Initialise HybridParallelMng"
298 << " global_rank=" << m_global_rank
299 << " local_rank=" << m_local_rank
300 << " mpi_rank=" << m_mpi_parallel_mng->commRank();
301
302 m_timer_mng = new TimerMng(tm);
303
304 // Créé le gestionnaire séquentiel associé.
305 {
307 bi.setTraceMng(traceMng());
308 bi.setCommunicator(communicator());
309 bi.setThreadMng(threadMng());
310 m_sequential_parallel_mng = arcaneCreateSequentialParallelMngRef(bi);
311 }
312
313 DispatchCreator creator(m_trace,this,m_message_queue,m_all_dispatchers);
314 this->createDispatchers(creator);
315 m_io_mng = arcaneCreateIOMng(this);
316}
317
318/*---------------------------------------------------------------------------*/
319/*---------------------------------------------------------------------------*/
320
321/*----------------------------------------------------------------------------*/
322/*---------------------------------------------------------------------------*/
323
326{
327 Trace::Setter mci(m_trace,"Thread");
328 if (m_is_initialized){
329 m_trace->warning() << "HybridParallelMng already initialized";
330 return;
331 }
332
333 m_is_initialized = true;
334}
335
336/*---------------------------------------------------------------------------*/
337/*---------------------------------------------------------------------------*/
338
339SerializeBuffer* HybridParallelMng::
340_castSerializer(ISerializer* serializer)
341{
342 auto sbuf = dynamic_cast<SerializeBuffer*>(serializer);
343 if (!sbuf)
344 ARCANE_THROW(ArgumentException,"can not cast 'ISerializer' to 'SerializeBuffer'");
345 return sbuf;
346}
347
348/*---------------------------------------------------------------------------*/
349/*---------------------------------------------------------------------------*/
350
353{
354 return m_utils_factory->createGetVariablesValuesOperation(this)._release();
355}
356
359{
360 return m_utils_factory->createTransferValuesOperation(this)._release();
361}
362
365{
366 return m_utils_factory->createExchanger(this)._release();
367}
368
369/*---------------------------------------------------------------------------*/
370/*---------------------------------------------------------------------------*/
371
372/*---------------------------------------------------------------------------*/
373/*---------------------------------------------------------------------------*/
374
375void HybridParallelMng::
376sendSerializer(ISerializer* s,Int32 rank)
377{
378 auto p2p_message = buildMessage(rank,Parallel::NonBlocking);
379 Request r = m_message_queue->addSend(p2p_message,s);
380 m_message_queue->waitAll(ArrayView<Request>(1,&r));
381}
382
383/*---------------------------------------------------------------------------*/
384/*---------------------------------------------------------------------------*/
385
386auto HybridParallelMng::
387sendSerializer(ISerializer* s,Int32 rank,ByteArray& bytes) -> Request
388{
389 ARCANE_UNUSED(bytes);
390 auto p2p_message = buildMessage(rank,Parallel::NonBlocking);
391 return m_message_queue->addSend(p2p_message,s);
392}
393
394/*---------------------------------------------------------------------------*/
395/*---------------------------------------------------------------------------*/
396
399{
400 return m_utils_factory->createSendSerializeMessage(this, rank)._release();
401}
402
403/*---------------------------------------------------------------------------*/
404/*---------------------------------------------------------------------------*/
405
406void HybridParallelMng::
407broadcastSerializer(ISerializer* values,Int32 rank)
408{
409 Timer::Phase tphase(timeStats(),TP_Communication);
410 SerializeBuffer* sbuf = _castSerializer(values);
411
412 bool is_broadcaster = (rank==commRank());
413
414 // Effectue l'envoie en deux phases. Envoie d'abord le nombre d'éléments
415 // puis envoie les éléments.
416 // TODO: il serait possible de le faire en une fois pour les messages
417 // ne dépassant pas une certaine taille.
418
420 if (is_broadcaster){
421 Int64 total_size = sbuf->totalSize();
422 Span<Byte> bytes = sbuf->globalBuffer();
423 this->broadcast(Int64ArrayView(1,&total_size),rank);
424 mpBroadcast(mpm,bytes,rank);
425 }
426 else{
427 Int64 total_size = 0;
428 this->broadcast(Int64ArrayView(1,&total_size),rank);
429 sbuf->preallocate(total_size);
430 Span<Byte> bytes = sbuf->globalBuffer();
431 mpBroadcast(mpm,bytes,rank);
432 sbuf->setFromSizes();
433 }
434}
435
436/*---------------------------------------------------------------------------*/
437/*---------------------------------------------------------------------------*/
438
439void HybridParallelMng::
440recvSerializer(ISerializer* s,Int32 rank)
441{
442 auto p2p_message = buildMessage(rank,Parallel::NonBlocking);
443 Request r = m_message_queue->addReceive(p2p_message,ReceiveBufferInfo(s));
444 m_message_queue->waitAll(ArrayView<Request>(1,&r));
445}
446
447/*---------------------------------------------------------------------------*/
448/*---------------------------------------------------------------------------*/
449
452{
453 return m_utils_factory->createReceiveSerializeMessage(this, rank)._release();
454}
455
456/*---------------------------------------------------------------------------*/
457/*---------------------------------------------------------------------------*/
458
461{
462 ARCANE_UNUSED(requests);
463 throw NotImplementedException(A_FUNCINFO);
464}
465
466/*---------------------------------------------------------------------------*/
467/*---------------------------------------------------------------------------*/
468
470probe(const PointToPointMessageInfo& message)
471{
472 PointToPointMessageInfo p2p_message(message);
474 return m_message_queue->probe(p2p_message);
475}
476
477/*---------------------------------------------------------------------------*/
478/*---------------------------------------------------------------------------*/
479
482{
483 PointToPointMessageInfo p2p_message(message);
485 return m_message_queue->legacyProbe(p2p_message);
486}
487
488/*---------------------------------------------------------------------------*/
489/*---------------------------------------------------------------------------*/
490
491Request HybridParallelMng::
492sendSerializer(const ISerializer* s,const PointToPointMessageInfo& message)
493{
494 auto p2p_message = buildMessage(message);
495 return m_message_queue->addSend(p2p_message,s);
496}
497
498/*---------------------------------------------------------------------------*/
499/*---------------------------------------------------------------------------*/
500
501Request HybridParallelMng::
502receiveSerializer(ISerializer* s,const PointToPointMessageInfo& message)
503{
504 auto p2p_message = buildMessage(message);
505 return m_message_queue->addReceive(p2p_message,ReceiveBufferInfo(s));
506}
507
508/*---------------------------------------------------------------------------*/
509/*---------------------------------------------------------------------------*/
510
513{
514 if (m_stat)
515 m_stat->print(m_trace);
516}
517
518/*---------------------------------------------------------------------------*/
519/*---------------------------------------------------------------------------*/
520
522barrier()
523{
524 m_thread_barrier->wait();
525 if (m_local_rank==0)
526 m_mpi_parallel_mng->barrier();
527 m_thread_barrier->wait();
528}
529
530/*---------------------------------------------------------------------------*/
531/*---------------------------------------------------------------------------*/
532
533ISerializeMessageList* HybridParallelMng::
534_createSerializeMessageList()
535{
536 auto* x = new MP::internal::SerializeMessageList(messagePassingMng());
537 x->setAllowAnyRankReceive(false);
538 return x;
539}
540
541/*---------------------------------------------------------------------------*/
542/*---------------------------------------------------------------------------*/
543
546{
547 return m_utils_factory->createSynchronizer(this,family)._release();
548}
549
550/*---------------------------------------------------------------------------*/
551/*---------------------------------------------------------------------------*/
552
554createSynchronizer(const ItemGroup& group)
555{
556 return m_utils_factory->createSynchronizer(this,group)._release();
557}
558
559/*---------------------------------------------------------------------------*/
560/*---------------------------------------------------------------------------*/
561
564{
565 return m_utils_factory->createTopology(this)._release();
566}
567
568/*---------------------------------------------------------------------------*/
569/*---------------------------------------------------------------------------*/
570
572replication() const
573{
574 return m_replication;
575}
576
577/*---------------------------------------------------------------------------*/
578/*---------------------------------------------------------------------------*/
579
582{
583 delete m_replication;
584 m_replication = v;
585}
586
587/*---------------------------------------------------------------------------*/
588/*---------------------------------------------------------------------------*/
589
592{
593 return m_sequential_parallel_mng.get();
594}
595
596/*---------------------------------------------------------------------------*/
597/*---------------------------------------------------------------------------*/
598
599Ref<IParallelMng> HybridParallelMng::
600sequentialParallelMngRef()
601{
602 return m_sequential_parallel_mng;
603}
604
605/*---------------------------------------------------------------------------*/
606/*---------------------------------------------------------------------------*/
612{
614 public:
615 RequestList(HybridParallelMng* pm)
616 : m_parallel_mng(pm), m_message_queue(pm->m_message_queue),
617 m_local_rank(m_parallel_mng->localRank()) {}
618 public:
619 void _wait(Parallel::eWaitType wait_type) override
620 {
621 switch(wait_type){
622 case Parallel::WaitAll:
623 m_parallel_mng->m_message_queue->waitAll(_requests());
624 break;
626 m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),false);
627 break;
629 m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),true);
630 }
631 }
632 private:
633 HybridParallelMng* m_parallel_mng;
634 HybridMessageQueue* m_message_queue;
636};
637
638/*---------------------------------------------------------------------------*/
639/*---------------------------------------------------------------------------*/
640
647
648/*---------------------------------------------------------------------------*/
649/*---------------------------------------------------------------------------*/
650
653{
654 m_message_queue->waitAll(requests);
655}
656
657/*---------------------------------------------------------------------------*/
658/*---------------------------------------------------------------------------*/
659
662{
663 return m_mpi_parallel_mng->getMPICommunicator();
664}
665
666/*---------------------------------------------------------------------------*/
667/*---------------------------------------------------------------------------*/
668
669MP::Communicator HybridParallelMng::
670communicator() const
671{
672 return m_mpi_parallel_mng->communicator();
673}
674
675/*---------------------------------------------------------------------------*/
676/*---------------------------------------------------------------------------*/
677
678MP::Communicator HybridParallelMng::
680{
681 return m_mpi_parallel_mng->machineCommunicator();
682}
683
684/*---------------------------------------------------------------------------*/
685/*---------------------------------------------------------------------------*/
686
689{
690 PointToPointMessageInfo p2p_message(message);
691 p2p_message.setEmiterRank(MessageRank(m_global_rank));
692 return p2p_message;
693}
694
695/*---------------------------------------------------------------------------*/
696/*---------------------------------------------------------------------------*/
697
700{
701 return buildMessage({MessageRank(dest),blocking_mode});
702}
703
704/*---------------------------------------------------------------------------*/
705/*---------------------------------------------------------------------------*/
706
707IParallelMng* HybridParallelMng::
708_createSubParallelMng(Int32ConstArrayView kept_ranks)
709{
710 ARCANE_UNUSED(kept_ranks);
711 ARCANE_THROW(NotSupportedException,"Use createSubParallelMngRef() instead");
712}
713
714/*---------------------------------------------------------------------------*/
715/*---------------------------------------------------------------------------*/
716
719{
720 // ATTENTION: Cette méthode est appelée simultanément par tous les threads
721 // partageant cet HybridParallelMng.
722
723 if (kept_ranks.empty())
724 ARCANE_FATAL("kept_ranks is empty");
725 ARCANE_CHECK_POINTER(m_sub_builder_factory);
726
727 m_trace->info() << "CREATE SUB_PARALLEL_MNG_REF";
728
729 /*
730 Il existe plusieurs possibilités:
731 1. on réduit juste le nombre de rangs en mémoire partagé pour chaque
732 processus MPI -< on créé un HybridParallelMng
733 2. On ne garde que le rang maitre de chaque processus MPI -> on créé un MpiParallelMng.
734 3. On ne garde que les rangs d'un même processus -> on créé un SharedMemoryParallelMng
735 4. On ne garde qu'un seul rang: on crée un MpiSequentialParallelMng.
736 */
737 // Pour l'instant, on ne supporte que le cas 1 et 2.
738 Int32 nb_kept_rank = kept_ranks.size();
739
740 // Détermine le nouveau nombre de rangs locaux par rang MPI.
741
742 // Regarde si je suis dans les listes des rangs conservés et si oui
743 // détermine mon rang dans le IParallelMng créé
744 Int32 first_global_rank_in_this_mpi = m_global_rank - m_local_rank;
745 Int32 last_global_rank_in_this_mpi = first_global_rank_in_this_mpi + m_local_nb_rank - 1;
746 // Mon nouveau rang local. Négatif si je ne suis pas dans le nouveau communicateur
747 Int32 my_new_global_rank = (-1);
748 Int32 new_local_nb_rank = 0;
749 Int32 my_new_local_rank = (-1);
750 for( Integer i=0; i<nb_kept_rank; ++i ){
751 Int32 kept_rank = kept_ranks[i];
752 if (kept_rank>=first_global_rank_in_this_mpi && kept_rank<last_global_rank_in_this_mpi)
753 ++new_local_nb_rank;
754 if (kept_rank==m_global_rank){
755 my_new_global_rank = i;
756 my_new_local_rank = new_local_nb_rank - 1;
757 }
758 }
759 bool has_new_rank = (my_new_global_rank != (-1));
760
761 // Calcule le min, le max et la somme sur tous les rangs du nombre de nouveaux.
762 // Deux cas peuvent se présenter:
763 // 1. Le min et le max sont égaux et supérieurs ou égaux à 2: Dans ce cas on créé
764 // un HybridParallelMng.
765 // 2. Le max vaut 1. Dans ce cas on créé un nouveau IParallelMng via le MpiParallelMng.
766 // Les rangs actuels pour lequels 'new_local_nb_rank' vaut 0 ne seront pas dans ce
767 // nouveau communicateur. Ce cas concerne aussi le cas où il ne reste plus qu'un
768 // seul rang à la fin.
769
770 Int32 min_new_local_nb_rank = -1;
771 Int32 max_new_local_nb_rank = -1;
772 Int32 sum_new_local_nb_rank = -1;
773 Int32 min_rank = A_NULL_RANK;
774 Int32 max_rank = A_NULL_RANK;
775 computeMinMaxSum(new_local_nb_rank,min_new_local_nb_rank,max_new_local_nb_rank,
776 sum_new_local_nb_rank,min_rank,max_rank);
777
778 m_trace->info() << "CREATE SUB_PARALLEL_MNG_REF new_local_nb_rank=" << new_local_nb_rank
779 << " min=" << min_new_local_nb_rank
780 << " max=" << max_new_local_nb_rank
781 << " sum=" << sum_new_local_nb_rank
782 << " new_global_rank=" << my_new_global_rank;
783
784 // S'il ne reste qu'un seul rang local, alors on construit uniquement un MpiParallelMng.
785 // Seul le PE qui a un nouveau rang est concerné et fait cela
786 if (max_new_local_nb_rank==1){
787 Integer nb_mpi_rank = m_mpi_parallel_mng->commSize();
788 // Il faut calculer les nouveaux rangs MPI.
789 // Si 'min_new_local_nb_rank' vaut 1, alors c'est simple car cela signifie qu'on garde
790 // tous les rangs MPI actuels (on fait l'équivalent d'un MPI_Comm_dup). Sinon, on
791 // récupère pour chaque rang MPI s'il sera dans le nouveau communicateur et on construit
792 // la liste des rangs conservés en fonction de cela.
793 // NOTE: dans tous les cas il faut faire attention qu'un seul thread utilise le
794 // 'm_mpi_parallel_mng'.
795 UniqueArray<Int32> kept_mpi_ranks;
797 bool do_mpi_call = false;
798 if (min_new_local_nb_rank==1){
799 if (has_new_rank){
800 do_mpi_call = true;
801 kept_mpi_ranks.resize(nb_mpi_rank);
802 for( Int32 x=0; x<nb_mpi_rank; ++x )
803 kept_mpi_ranks[x] = x;
804 }
805 }
806 else{
807 // Si je ne suis pas dans le nouveau communicateur, c'est le rang local 0 qui
808 // faut le 'gather'.
809 UniqueArray<Int16> gathered_ranks(nb_mpi_rank);
810 if (has_new_rank || m_local_rank==0){
811 do_mpi_call = true;
812 Int16 v = (has_new_rank) ? 1 : 0;
813 m_mpi_parallel_mng->allGather(ArrayView<Int16>(1,&v),gathered_ranks);
814 }
815 for( Int32 x=0; x<nb_mpi_rank; ++x )
816 if (gathered_ranks[x]==1)
817 kept_mpi_ranks.add(x);
818 }
819 if (do_mpi_call)
820 return m_mpi_parallel_mng->createSubParallelMngRef(kept_mpi_ranks);
821 else
822 return Ref<IParallelMng>();
823 }
824
825 if (max_new_local_nb_rank!=new_local_nb_rank)
826 ARCANE_FATAL("Not same number of new local ranks on every MPI processus: current={0} max={1}",
827 new_local_nb_rank,max_new_local_nb_rank);
828
829 if (max_new_local_nb_rank<2)
830 ARCANE_FATAL("number of local ranks is too low current={0} minimum=2",new_local_nb_rank);
831
832 // Met une barrière locale pour être sur que tout le monde attend ici.
833 m_thread_barrier->wait();
834
835 // NOTE: Le builder contient les parties communes aux IParallelMng créés. Il faut
836 // donc que ces derniers gardent une référence dessus sinon il sera détruit à la fin
837 // de cette méthode.
839
840 // Le rang 0 créé le builder
841 if (m_local_rank==0){
842 // Suppose qu'on à le même nombre de rangs MPI qu'avant donc on utilise
843 // le communicateur MPI qu'on a déjà.
844 MP::Communicator c = communicator();
845 MP::Communicator mc = machineCommunicator();
846 builder = m_sub_builder_factory->_createParallelMngBuilder(new_local_nb_rank, c, mc);
847 // Positionne le builder pour tout le monde
848 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder = builder;
849 }
850 // Attend pour être sur que tous les threads voit le bon builder.
851 m_thread_barrier->wait();
852
853 builder = m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder;
854 ARCANE_CHECK_POINTER(builder.get());
855
856 Ref<IParallelMng> new_parallel_mng;
857 if (my_new_local_rank>=0){
858 new_parallel_mng = builder->_createParallelMng(my_new_local_rank,traceMng());
859 }
860 m_thread_barrier->wait();
861
862 // Ici, tout le monde a créé son IParallelMng. On peut donc
863 // supprimer la référence au builder. Les IParallelMng créés gardent
864 // une référence au builder
865 if (m_local_rank==0){
866 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder.reset();
867 }
868 m_thread_barrier->wait();
869
870 return new_parallel_mng;
871}
872
873/*---------------------------------------------------------------------------*/
874/*---------------------------------------------------------------------------*/
875
878{
879 return m_utils_factory;
880}
881
882/*---------------------------------------------------------------------------*/
883/*---------------------------------------------------------------------------*/
884
885bool HybridParallelMng::
886_isAcceleratorAware() const
887{
888 return m_mpi_parallel_mng->_internalApi()->isAcceleratorAware();
889}
890
891/*---------------------------------------------------------------------------*/
892/*---------------------------------------------------------------------------*/
893
894} // End namespace Arcane::MessagePassing
895
896/*---------------------------------------------------------------------------*/
897/*---------------------------------------------------------------------------*/
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Liste des fonctions d'échange de message.
Exception lorsqu'un argument est invalide.
Vue modifiable d'un tableau d'un type T.
void resize(Int64 s)
Change le nombre d'éléments du tableau à s.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
Vue constante d'un tableau de type T.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
constexpr bool empty() const noexcept
true si le tableau est vide (size()==0)
Opérations pour accéder aux valeurs de variables d'un autre sous-domaine.
Interface du gestionnaire des entrées sorties.
Definition IIOMng.h:36
Interface d'une famille d'entités.
Definition IItemFamily.h:84
Interface d'un allocateur pour la mémoire.
Échange d'informations entre processeurs.
virtual bool isAcceleratorAware() const =0
Indique si l'implémentation gère les accélérateurs.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual void computeMinMaxSum(char val, char &min_val, char &max_val, char &sum_val, Int32 &min_rank, Int32 &max_rank)=0
Calcule en une opération la somme, le min, le max d'une valeur.
Informations sur la réplication des sous-domaines en parallèle.
Informations sur la topologie d'allocation des coeurs de calcul.
Interface du gestionnaire de traces.
virtual TraceMessage info()=0
Flot pour un message d'information.
Envoie de valeurs sur différents processeurs.
Interface d'un service de synchronisation de variable.
Groupe d'entités de maillage.
Definition ItemGroup.h:49
Interface d'une file de messages avec les threads.
Ref< IMachineMemoryWindowBaseInternal > createMachineMemoryWindowBase(Int64 sizeof_segment, Int32 sizeof_type) override
Méthode permettant de créer une fenêtre mémoire sur le noeud.
Ref< IDynamicMachineMemoryWindowBaseInternal > createDynamicMachineMemoryWindowBase(Int64 sizeof_segment, Int32 sizeof_type) override
Méthode permettant de créer une fenêtre mémoire dynamique sur le noeud.
Implémentation de IRequestList pour HybridParallelMng.
void _wait(Parallel::eWaitType wait_type) override
Effectue l'attente ou le test.
Gestionnaire du parallélisme utilisant les threads.
IParallelMng * worldParallelMng() const override
Gestionnaire de parallélisme sur l'ensemble des ressources allouées.
void freeRequests(ArrayView< Request > requests) override
Libère les requêtes.
void printStats() override
Affiche des statistiques liées à ce gestionnaire du parallélisme.
IParallelExchanger * createExchanger() override
Retourne une interface pour transférer des messages entre processeurs.
void barrier() override
Effectue une barière.
MP::Communicator communicator() const override
Communicateur MPI associé à ce gestionnaire.
bool m_is_initialized
true si déjà initialisé
ISerializeMessage * createReceiveSerializer(Int32 rank) override
Créé un message non bloquant pour recevoir des données sérialisées du rang rank.
Ref< Parallel::IRequestList > createRequestListRef() override
Créé une liste de requêtes pour ce gestionnaire.
ITimerMng * timerMng() const override
Gestionnaire de timers.
void build() override
Construit l'instance.
void waitAllRequests(ArrayView< Request > requests) override
Bloque en attendant que les requêtes rvalues soient terminées.
void initialize() override
Initialise le gestionnaire du parallélisme.
IThreadMng * threadMng() const override
Gestionnaire de threads.
Int32 m_global_nb_rank
Nombre de rangs globaux.
MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
Int32 m_local_rank
Rang local du processeur actuel.
ITraceMng * traceMng() const override
Gestionnaire de traces.
IGetVariablesValuesParallelOperation * createGetVariablesValuesOperation() override
Retourne une opération pour récupérer les valeurs d'une variable sur les entités d'un autre sous-doma...
void * getMPICommunicator() override
Adresse du communicateur MPI associé à ce gestionnaire.
IParallelReplication * replication() const override
Informations sur la réplication.
Ref< IParallelMngUtilsFactory > _internalUtilsFactory() const override
Fabrique des fonctions utilitaires.
Ref< IParallelMng > createSubParallelMngRef(Int32ConstArrayView kept_ranks) override
Créé un nouveau gestionnaire de parallélisme pour un sous-ensemble des rangs.
MP::Communicator machineCommunicator() const override
Communicateur MPI issus du communicateur communicator() réunissant tous les processus du noeud de cal...
ITransferValuesParallelOperation * createTransferValuesOperation() override
Retourne une opération pour transférer des valeurs entre sous-domaine.
IParallelTopology * createTopology() override
Créé une instance contenant les infos sur la topologie des rangs de ce gestionnnaire.
void setReplication(IParallelReplication *v) override
Positionne les Informations sur la réplication.
IParallelMng * sequentialParallelMng() override
Retourne un gestionnaire de parallélisme séquentiel.
IVariableSynchronizer * createSynchronizer(IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
Int32 m_local_nb_rank
Nombre de rang locaux.
MessageId probe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
ISerializeMessage * createSendSerializer(Int32 rank) override
Créé un message non bloquant pour envoyer des données sérialisées au rang rank.
Int32 commRank() const override
Rang de cette instance dans le communicateur.
Int32 m_global_rank
Numéro du processeur actuel.
PointToPointMessageInfo buildMessage(Int32 dest, MP::eBlockingType is_blocking)
Construit un message avec pour destinataire dest.
void processPendingMessages() override
Envoie les messages de la liste qui ne l'ont pas encore été.
Integer waitMessages(Parallel::eWaitType wait_type) override
Attend que les messages aient terminé leur exécution.
void addMessage(ISerializeMessage *msg) override
Ajoute un message à la liste.
Interface du gestionnaire des échanges de messages.
Interface d'un message de sérialisation entre IMessagePassingMng.
Informations sur la source d'un message.
Informations pour envoyer/recevoir un message point à point.
void setEmiterRank(MessageRank rank)
Positionne le rang de l'émetteur du message.
Informations des buffers de réception.
Requête d'un message.
Definition Request.h:77
Classe de base d'une liste de requêtes.
IParallelMngInternal * _internalApi() override
API interne à Arcane.
Exception lorsqu'une fonction n'est pas implémentée.
Redirige la gestion des messages des sous-domaines suivant le type de l'argument.
IMessagePassingMng * messagePassingMng() const override
Gestionnaire de message de Arccore associé
ITimeStats * timeStats() const override
Gestionnaire de statistiques associé (peut être nul)
Classe de base d'une fabrique pour les fonctions utilitaires de IParallelMng.
Informations sur la réplication des sous-domaines en parallèle.
Référence à une instance.
InstanceType * get() const
Instance associée ou nullptr si aucune.
Implémentation d'un tampon pour la sérialisation.
Vue d'un tableau d'éléments de type T.
Definition Span.h:633
Gestionnaire de timer.
Definition TimerMng.h:39
Positionne la phase de l'action en cours d'exécution.
Definition Timer.h:128
Positionne une classe de message.
Vecteur 1D de données avec sémantique par valeur (style STL).
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
C void mpBroadcast(IMessagePassingMng *pm, Span< char > send_buf, Int32 rank)
@ WaitSome
Attend que tous les messages de la liste soient traités.
eBlockingType
Type indiquant si un message est bloquant ou non.
Implémentation de la concurrence.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
ArrayView< Int64 > Int64ArrayView
Equivalent C d'un tableau à une dimension d'entiers 64 bits.
Definition UtilsTypes.h:451
Ref< TrueType > createRef(Args &&... args)
Créé une instance de type TrueType avec les arguments Args et retourne une référence dessus.
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
Array< Byte > ByteArray
Tableau dynamique à une dimension de caractères.
Definition UtilsTypes.h:121
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:482
std::int16_t Int16
Type entier signé sur 16 bits.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int32_t Int32
Type entier signé sur 32 bits.
Infos pour construire un HybridParallelMng.
Infos pour construire un SequentialParallelMng.