Arcane  v4.1.8.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
HybridParallelMng.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2026 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* HybridParallelMng.cc (C) 2000-2026 */
9/* */
10/* Gestionnaire de parallélisme utilisant un mixte MPI/Threads. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/parallel/mpithread/HybridParallelMng.h"
15
16#include "arcane/utils/NotImplementedException.h"
17#include "arcane/utils/NotSupportedException.h"
18#include "arcane/utils/FatalErrorException.h"
19#include "arcane/utils/NumericTypes.h"
20#include "arcane/utils/ArgumentException.h"
21#include "arcane/utils/IThreadBarrier.h"
22#include "arcane/utils/ITraceMng.h"
23
24#include "arcane/core/IIOMng.h"
25#include "arcane/core/Timer.h"
26#include "arcane/core/ISerializeMessageList.h"
27#include "arcane/core/IItemFamily.h"
28#include "arcane/core/IParallelTopology.h"
29#include "arcane/core/internal/ParallelMngInternal.h"
30#include "arcane/core/internal/SerializeMessage.h"
31#include "arcane/core/internal/MachineShMemWinMemoryAllocator.h"
32#include "arcane/core/parallel/IStat.h"
33
34#include "arcane/parallel/mpithread/HybridParallelDispatch.h"
35#include "arcane/parallel/mpithread/HybridMessageQueue.h"
36#include "arcane/parallel/mpithread/internal/HybridMachineShMemWinBaseInternalCreator.h"
37#include "arcane/parallel/mpithread/internal/HybridContigMachineShMemWinBaseInternal.h"
38#include "arcane/parallel/mpithread/internal/HybridMachineShMemWinBaseInternal.h"
39
40#include "arcane/parallel/mpi/MpiParallelMng.h"
41
42#include "arcane/impl/TimerMng.h"
43#include "arcane/impl/ParallelReplication.h"
44#include "arcane/impl/SequentialParallelMng.h"
45#include "arcane/impl/internal/ParallelMngUtilsFactoryBase.h"
46
48#include "arccore/message_passing/RequestListBase.h"
49#include "arccore/message_passing/internal/SerializeMessageList.h"
50
51/*---------------------------------------------------------------------------*/
52/*---------------------------------------------------------------------------*/
53
54namespace Arcane
55{
56extern "C++" IIOMng*
57arcaneCreateIOMng(IParallelMng* psm);
58}
59
61{
62
63/*---------------------------------------------------------------------------*/
64/*---------------------------------------------------------------------------*/
65
66// NOTE: Cette classe n'est plus utilisée. Elle reste pour référence
67// et sera supprimée ultérieurement
68class HybridSerializeMessageList
70{
71 public:
72 class HybridSerializeMessageRequest
73 {
74 public:
75 HybridSerializeMessageRequest(ISerializeMessage* message,Request request)
76 : m_message(message), m_request(request){}
77 public:
78 ISerializeMessage* m_message = nullptr;
79 Request m_request;
80 };
81
82 public:
83
84 explicit HybridSerializeMessageList(HybridParallelMng* mpm)
85 : m_parallel_mng(mpm), m_trace(mpm->traceMng())
86 {
87 }
88
89 public:
90
91 void addMessage(ISerializeMessage* msg) override
92 {
93 m_messages_to_process.add(msg);
94
95 }
96 void processPendingMessages() override
97 {
98 }
99
101 {
102 switch(wait_type){
103 case Parallel::WaitAll:
104 // Pour l'instant seul le mode bloquant est supporté.
105 //m_parallel_mng->processMessages(m_messages_to_process);
106 _wait(Parallel::WaitAll);
107 m_messages_to_process.clear();
108 return (-1);
112 ARCANE_THROW(NotImplementedException,"WaitSomeNonBlocking");
113 }
114 return (-1);
115 }
116
117 private:
118
119 HybridParallelMng* m_parallel_mng;
120 ITraceMng* m_trace;
121 UniqueArray<ISerializeMessage*> m_messages_to_process;
122
123 void _waitAll();
124 void _wait(Parallel::eWaitType wait_mode);
125};
126
127/*---------------------------------------------------------------------------*/
128/*---------------------------------------------------------------------------*/
129
130void HybridSerializeMessageList::
131_wait(Parallel::eWaitType wait_mode)
132{
133 m_trace->info() << "BEGIN PROCESS MESSAGES";
134
135 // TODO: gérer la memoire sans faire de new.
136 ConstArrayView<ISerializeMessage*> messages = m_messages_to_process;
137 HybridMessageQueue* message_queue = m_parallel_mng->m_message_queue;
138 UniqueArray<Request> all_requests;
139 MessageTag HYBRID_MESSAGE_TAG(511);
140 for( ISerializeMessage* sm : messages ){
141 ISerializer* s = sm->serializer();
142 MessageRank orig(sm->source());
143 MessageRank dest(sm->destination());
144 PointToPointMessageInfo message_info(orig,dest,HYBRID_MESSAGE_TAG,Parallel::NonBlocking);
145 Request r;
146 if (sm->isSend())
147 r = message_queue->addSend(message_info,SendBufferInfo(s));
148 else
149 r = message_queue->addReceive(message_info,ReceiveBufferInfo(s));
150 all_requests.add(r);
151 }
152
153 if (wait_mode==Parallel::WaitAll)
154 message_queue->waitAll(all_requests);
155
156 for( ISerializeMessage* sm : messages )
157 sm->setFinished(true);
158}
159
160/*---------------------------------------------------------------------------*/
161/*---------------------------------------------------------------------------*/
162
163/*---------------------------------------------------------------------------*/
164/*---------------------------------------------------------------------------*/
165
167: public ParallelMngInternal
168{
169 public:
170
171 explicit Impl(HybridParallelMng* pm, HybridMachineShMemWinBaseInternalCreator* window_creator)
172 : ParallelMngInternal(pm)
173 , m_parallel_mng(pm)
174 , m_window_creator(window_creator)
175 , m_alloc(makeRef(new MachineShMemWinMemoryAllocator(pm)))
176 {}
177
178 ~Impl() override = default;
179
180 public:
181
183 {
184 return m_parallel_mng->m_mpi_parallel_mng->commRank() * m_parallel_mng->m_local_nb_rank;
185 }
187 {
188 return m_parallel_mng->m_local_nb_rank;
189 }
190
192 {
193 m_parallel_mng->traceMng()->debug() << "initializeWindowCreator() Hybrid";
194 m_window_creator->initializeMpiWindowCreator(m_parallel_mng->commRank(), m_parallel_mng->mpiParallelMng());
195 }
196
198 {
199 if (m_shmem_available == 1) {
200 return true;
201 }
202
203 if (m_shmem_available == 0) {
204 Ref<IParallelTopology> topo = m_parallel_mng->_internalUtilsFactory()->createTopology(m_parallel_mng);
205 if (topo->machineRanks().size() == m_window_creator->machineRanks().size()) {
206 m_shmem_available = 1;
207 return true;
208 }
209 // Problème avec MPI. Peut intervenir si MPICH est compilé en mode ch3:sock.
210 m_shmem_available = 2;
211 return false;
212 }
213
214 return false;
215 }
216
218 {
219 return makeRef(m_window_creator->createWindow(m_parallel_mng->commRank(), sizeof_segment, sizeof_type, m_parallel_mng->mpiParallelMng()));
220 }
221
223 {
224 return makeRef(m_window_creator->createDynamicWindow(m_parallel_mng->commRank(), sizeof_segment, sizeof_type, m_parallel_mng->mpiParallelMng()));
225 }
226
228 {
229 return MemoryAllocationOptions{ m_alloc.get() };
230 }
231
233 {
234 return m_window_creator->machineRanks();
235 }
236
237 void machineBarrier() override
238 {
239 m_window_creator->machineBarrier(m_parallel_mng->commRank(), m_parallel_mng->mpiParallelMng());
240 }
241
242 private:
243
244 HybridParallelMng* m_parallel_mng;
247
248 // 0 = Attribut non initialisé
249 // 1 = Mémoire partagée dispo
250 // 2 = Mémoire partagée non dispo
251 Int8 m_shmem_available = 0;
252};
253
254/*---------------------------------------------------------------------------*/
255/*---------------------------------------------------------------------------*/
256
257/*---------------------------------------------------------------------------*/
258/*---------------------------------------------------------------------------*/
259
260HybridParallelMng::
261HybridParallelMng(const HybridParallelMngBuildInfo& bi)
262: ParallelMngDispatcher(ParallelMngDispatcherBuildInfo(bi.local_rank,bi.local_nb_rank))
263, m_trace(bi.trace_mng)
264, m_thread_mng(bi.thread_mng)
265, m_world_parallel_mng(bi.world_parallel_mng)
266, m_io_mng(nullptr)
267, m_timer_mng(nullptr)
268, m_replication(new ParallelReplication())
269, m_message_queue(new HybridMessageQueue(bi.message_queue,bi.mpi_parallel_mng,bi.local_nb_rank))
270, m_is_initialized(false)
271, m_stat(Parallel::createDefaultStat())
272, m_thread_barrier(bi.thread_barrier)
273, m_mpi_parallel_mng(bi.mpi_parallel_mng)
274, m_all_dispatchers(bi.all_dispatchers)
275, m_sub_builder_factory(bi.sub_builder_factory)
276, m_parent_container_ref(bi.container)
277, m_utils_factory(createRef<ParallelMngUtilsFactoryBase>())
278, m_parallel_mng_internal(new Impl(this, bi.window_creator))
279{
280 if (!m_world_parallel_mng)
281 m_world_parallel_mng = this;
282
283 // TODO: vérifier que tous les autres HybridParallelMng ont bien
284 // le même nombre de rang locaux (m_local_nb_rank)
285 m_local_rank = bi.local_rank;
286 m_local_nb_rank = bi.local_nb_rank;
287
288 Int32 mpi_rank = m_mpi_parallel_mng->commRank();
289 Int32 mpi_size = m_mpi_parallel_mng->commSize();
290
293
294 m_is_parallel = m_global_nb_rank!=1;
295}
296
297/*---------------------------------------------------------------------------*/
298/*---------------------------------------------------------------------------*/
299
300HybridParallelMng::
301~HybridParallelMng()
302{
303 m_sequential_parallel_mng.reset();
304 delete m_replication;
305 delete m_io_mng;
306 delete m_message_queue;
307 delete m_timer_mng;
308 delete m_stat;
309 delete m_mpi_parallel_mng;
310 delete m_parallel_mng_internal;
311}
312
313/*---------------------------------------------------------------------------*/
314/*---------------------------------------------------------------------------*/
315
316namespace
317{
318// Classe pour créer les différents dispatchers
319class DispatchCreator
320{
321 public:
322 DispatchCreator(ITraceMng* tm,HybridParallelMng* mpm,HybridMessageQueue* message_queue,MpiThreadAllDispatcher* all_dispatchers)
323 : m_tm(tm), m_mpm(mpm), m_message_queue(message_queue), m_all_dispatchers(all_dispatchers){}
324 public:
325 template<typename DataType> HybridParallelDispatch<DataType>*
326 create()
327 {
328 HybridMessageQueue* tmq = m_message_queue;
329 MpiThreadAllDispatcher* ad = m_all_dispatchers;
330 auto field = ad->instance((DataType*)nullptr).view();
331 return new HybridParallelDispatch<DataType>(m_tm,m_mpm,tmq,field);
332 }
333
334 ITraceMng* m_tm;
335 HybridParallelMng* m_mpm;
336 HybridMessageQueue* m_message_queue;
337 MpiThreadAllDispatcher* m_all_dispatchers;
338};
339}
340
341/*---------------------------------------------------------------------------*/
342/*---------------------------------------------------------------------------*/
343
345build()
346{
347 ITraceMng* tm = traceMng();
348 tm->info() << "Initialise HybridParallelMng"
349 << " global_rank=" << m_global_rank
350 << " local_rank=" << m_local_rank
351 << " mpi_rank=" << m_mpi_parallel_mng->commRank();
352
353 m_timer_mng = new TimerMng(tm);
354
355 // Créé le gestionnaire séquentiel associé.
356 {
358 bi.setTraceMng(traceMng());
359 bi.setCommunicator(communicator());
360 bi.setThreadMng(threadMng());
361 m_sequential_parallel_mng = arcaneCreateSequentialParallelMngRef(bi);
362 }
363
364 DispatchCreator creator(m_trace,this,m_message_queue,m_all_dispatchers);
365 this->createDispatchers(creator);
366 m_io_mng = arcaneCreateIOMng(this);
367
368 m_parallel_mng_internal->initializeWindowCreator();
369}
370
371/*---------------------------------------------------------------------------*/
372/*---------------------------------------------------------------------------*/
373
374/*----------------------------------------------------------------------------*/
375/*---------------------------------------------------------------------------*/
376
379{
380 Trace::Setter mci(m_trace,"Thread");
381 if (m_is_initialized){
382 m_trace->warning() << "HybridParallelMng already initialized";
383 return;
384 }
385
386 m_is_initialized = true;
387}
388
389/*---------------------------------------------------------------------------*/
390/*---------------------------------------------------------------------------*/
391
392SerializeBuffer* HybridParallelMng::
393_castSerializer(ISerializer* serializer)
394{
395 auto sbuf = dynamic_cast<SerializeBuffer*>(serializer);
396 if (!sbuf)
397 ARCANE_THROW(ArgumentException,"can not cast 'ISerializer' to 'SerializeBuffer'");
398 return sbuf;
399}
400
401/*---------------------------------------------------------------------------*/
402/*---------------------------------------------------------------------------*/
403
406{
407 return m_utils_factory->createGetVariablesValuesOperation(this)._release();
408}
409
412{
413 return m_utils_factory->createTransferValuesOperation(this)._release();
414}
415
418{
419 return m_utils_factory->createExchanger(this)._release();
420}
421
422/*---------------------------------------------------------------------------*/
423/*---------------------------------------------------------------------------*/
424
425/*---------------------------------------------------------------------------*/
426/*---------------------------------------------------------------------------*/
427
428void HybridParallelMng::
429sendSerializer(ISerializer* s,Int32 rank)
430{
431 auto p2p_message = buildMessage(rank,Parallel::NonBlocking);
432 Request r = m_message_queue->addSend(p2p_message,s);
433 m_message_queue->waitAll(ArrayView<Request>(1,&r));
434}
435
436/*---------------------------------------------------------------------------*/
437/*---------------------------------------------------------------------------*/
438
439auto HybridParallelMng::
440sendSerializer(ISerializer* s,Int32 rank,ByteArray& bytes) -> Request
441{
442 ARCANE_UNUSED(bytes);
443 auto p2p_message = buildMessage(rank,Parallel::NonBlocking);
444 return m_message_queue->addSend(p2p_message,s);
445}
446
447/*---------------------------------------------------------------------------*/
448/*---------------------------------------------------------------------------*/
449
452{
453 return m_utils_factory->createSendSerializeMessage(this, rank)._release();
454}
455
456/*---------------------------------------------------------------------------*/
457/*---------------------------------------------------------------------------*/
458
459void HybridParallelMng::
460broadcastSerializer(ISerializer* values,Int32 rank)
461{
462 Timer::Phase tphase(timeStats(),TP_Communication);
463 SerializeBuffer* sbuf = _castSerializer(values);
464
465 bool is_broadcaster = (rank==commRank());
466
467 // Effectue l'envoie en deux phases. Envoie d'abord le nombre d'éléments
468 // puis envoie les éléments.
469 // TODO: il serait possible de le faire en une fois pour les messages
470 // ne dépassant pas une certaine taille.
471
473 if (is_broadcaster){
474 Int64 total_size = sbuf->totalSize();
475 Span<Byte> bytes = sbuf->globalBuffer();
476 this->broadcast(Int64ArrayView(1,&total_size),rank);
477 mpBroadcast(mpm,bytes,rank);
478 }
479 else{
480 Int64 total_size = 0;
481 this->broadcast(Int64ArrayView(1,&total_size),rank);
482 sbuf->preallocate(total_size);
483 Span<Byte> bytes = sbuf->globalBuffer();
484 mpBroadcast(mpm,bytes,rank);
485 sbuf->setFromSizes();
486 }
487}
488
489/*---------------------------------------------------------------------------*/
490/*---------------------------------------------------------------------------*/
491
492void HybridParallelMng::
493recvSerializer(ISerializer* s,Int32 rank)
494{
495 auto p2p_message = buildMessage(rank,Parallel::NonBlocking);
496 Request r = m_message_queue->addReceive(p2p_message,ReceiveBufferInfo(s));
497 m_message_queue->waitAll(ArrayView<Request>(1,&r));
498}
499
500/*---------------------------------------------------------------------------*/
501/*---------------------------------------------------------------------------*/
502
505{
506 return m_utils_factory->createReceiveSerializeMessage(this, rank)._release();
507}
508
509/*---------------------------------------------------------------------------*/
510/*---------------------------------------------------------------------------*/
511
514{
515 ARCANE_UNUSED(requests);
516 throw NotImplementedException(A_FUNCINFO);
517}
518
519/*---------------------------------------------------------------------------*/
520/*---------------------------------------------------------------------------*/
521
523probe(const PointToPointMessageInfo& message)
524{
525 PointToPointMessageInfo p2p_message(message);
527 return m_message_queue->probe(p2p_message);
528}
529
530/*---------------------------------------------------------------------------*/
531/*---------------------------------------------------------------------------*/
532
535{
536 PointToPointMessageInfo p2p_message(message);
538 return m_message_queue->legacyProbe(p2p_message);
539}
540
541/*---------------------------------------------------------------------------*/
542/*---------------------------------------------------------------------------*/
543
544Request HybridParallelMng::
545sendSerializer(const ISerializer* s,const PointToPointMessageInfo& message)
546{
547 auto p2p_message = buildMessage(message);
548 return m_message_queue->addSend(p2p_message,s);
549}
550
551/*---------------------------------------------------------------------------*/
552/*---------------------------------------------------------------------------*/
553
554Request HybridParallelMng::
555receiveSerializer(ISerializer* s,const PointToPointMessageInfo& message)
556{
557 auto p2p_message = buildMessage(message);
558 return m_message_queue->addReceive(p2p_message,ReceiveBufferInfo(s));
559}
560
561/*---------------------------------------------------------------------------*/
562/*---------------------------------------------------------------------------*/
563
566{
567 if (m_stat)
568 m_stat->print(m_trace);
569}
570
571/*---------------------------------------------------------------------------*/
572/*---------------------------------------------------------------------------*/
573
575barrier()
576{
577 m_thread_barrier->wait();
578 if (m_local_rank==0)
579 m_mpi_parallel_mng->barrier();
580 m_thread_barrier->wait();
581}
582
583/*---------------------------------------------------------------------------*/
584/*---------------------------------------------------------------------------*/
585
586ISerializeMessageList* HybridParallelMng::
587_createSerializeMessageList()
588{
589 auto* x = new MP::internal::SerializeMessageList(messagePassingMng());
590 x->setAllowAnyRankReceive(false);
591 return x;
592}
593
594/*---------------------------------------------------------------------------*/
595/*---------------------------------------------------------------------------*/
596
599{
600 return m_utils_factory->createSynchronizer(this,family)._release();
601}
602
603/*---------------------------------------------------------------------------*/
604/*---------------------------------------------------------------------------*/
605
607createSynchronizer(const ItemGroup& group)
608{
609 return m_utils_factory->createSynchronizer(this,group)._release();
610}
611
612/*---------------------------------------------------------------------------*/
613/*---------------------------------------------------------------------------*/
614
617{
618 return m_utils_factory->createTopology(this)._release();
619}
620
621/*---------------------------------------------------------------------------*/
622/*---------------------------------------------------------------------------*/
623
625replication() const
626{
627 return m_replication;
628}
629
630/*---------------------------------------------------------------------------*/
631/*---------------------------------------------------------------------------*/
632
635{
636 delete m_replication;
637 m_replication = v;
638}
639
640/*---------------------------------------------------------------------------*/
641/*---------------------------------------------------------------------------*/
642
645{
646 return m_sequential_parallel_mng.get();
647}
648
649/*---------------------------------------------------------------------------*/
650/*---------------------------------------------------------------------------*/
651
652Ref<IParallelMng> HybridParallelMng::
653sequentialParallelMngRef()
654{
655 return m_sequential_parallel_mng;
656}
657
658/*---------------------------------------------------------------------------*/
659/*---------------------------------------------------------------------------*/
665{
667 public:
668 RequestList(HybridParallelMng* pm)
669 : m_parallel_mng(pm), m_message_queue(pm->m_message_queue),
670 m_local_rank(m_parallel_mng->localRank()) {}
671 public:
672 void _wait(Parallel::eWaitType wait_type) override
673 {
674 switch(wait_type){
675 case Parallel::WaitAll:
676 m_parallel_mng->m_message_queue->waitAll(_requests());
677 break;
679 m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),false);
680 break;
682 m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),true);
683 }
684 }
685 private:
686 HybridParallelMng* m_parallel_mng;
687 HybridMessageQueue* m_message_queue;
689};
690
691/*---------------------------------------------------------------------------*/
692/*---------------------------------------------------------------------------*/
693
700
701/*---------------------------------------------------------------------------*/
702/*---------------------------------------------------------------------------*/
703
706{
707 m_message_queue->waitAll(requests);
708}
709
710/*---------------------------------------------------------------------------*/
711/*---------------------------------------------------------------------------*/
712
715{
716 return m_mpi_parallel_mng->getMPICommunicator();
717}
718
719/*---------------------------------------------------------------------------*/
720/*---------------------------------------------------------------------------*/
721
722MP::Communicator HybridParallelMng::
723communicator() const
724{
725 return m_mpi_parallel_mng->communicator();
726}
727
728/*---------------------------------------------------------------------------*/
729/*---------------------------------------------------------------------------*/
730
731MP::Communicator HybridParallelMng::
733{
734 return m_mpi_parallel_mng->machineCommunicator();
735}
736
737/*---------------------------------------------------------------------------*/
738/*---------------------------------------------------------------------------*/
739
742{
743 PointToPointMessageInfo p2p_message(message);
744 p2p_message.setEmiterRank(MessageRank(m_global_rank));
745 return p2p_message;
746}
747
748/*---------------------------------------------------------------------------*/
749/*---------------------------------------------------------------------------*/
750
753{
754 return buildMessage({MessageRank(dest),blocking_mode});
755}
756
757/*---------------------------------------------------------------------------*/
758/*---------------------------------------------------------------------------*/
759
760IParallelMng* HybridParallelMng::
761_createSubParallelMng(Int32ConstArrayView kept_ranks)
762{
763 ARCANE_UNUSED(kept_ranks);
764 ARCANE_THROW(NotSupportedException,"Use createSubParallelMngRef() instead");
765}
766
767/*---------------------------------------------------------------------------*/
768/*---------------------------------------------------------------------------*/
769
772{
773 // ATTENTION: Cette méthode est appelée simultanément par tous les threads
774 // partageant cet HybridParallelMng.
775
776 if (kept_ranks.empty())
777 ARCANE_FATAL("kept_ranks is empty");
778 ARCANE_CHECK_POINTER(m_sub_builder_factory);
779
780 m_trace->info() << "CREATE SUB_PARALLEL_MNG_REF";
781
782 /*
783 Il existe plusieurs possibilités:
784 1. on réduit juste le nombre de rangs en mémoire partagé pour chaque
785 processus MPI -< on créé un HybridParallelMng
786 2. On ne garde que le rang maitre de chaque processus MPI -> on créé un MpiParallelMng.
787 3. On ne garde que les rangs d'un même processus -> on créé un SharedMemoryParallelMng
788 4. On ne garde qu'un seul rang: on crée un MpiSequentialParallelMng.
789 */
790 // Pour l'instant, on ne supporte que le cas 1 et 2.
791 Int32 nb_kept_rank = kept_ranks.size();
792
793 // Détermine le nouveau nombre de rangs locaux par rang MPI.
794
795 // Regarde si je suis dans les listes des rangs conservés et si oui
796 // détermine mon rang dans le IParallelMng créé
797 Int32 first_global_rank_in_this_mpi = m_global_rank - m_local_rank;
798 Int32 last_global_rank_in_this_mpi = first_global_rank_in_this_mpi + m_local_nb_rank - 1;
799 // Mon nouveau rang local. Négatif si je ne suis pas dans le nouveau communicateur
800 Int32 my_new_global_rank = (-1);
801 Int32 new_local_nb_rank = 0;
802 Int32 my_new_local_rank = (-1);
803 for( Integer i=0; i<nb_kept_rank; ++i ){
804 Int32 kept_rank = kept_ranks[i];
805 if (kept_rank>=first_global_rank_in_this_mpi && kept_rank<last_global_rank_in_this_mpi)
806 ++new_local_nb_rank;
807 if (kept_rank==m_global_rank){
808 my_new_global_rank = i;
809 my_new_local_rank = new_local_nb_rank - 1;
810 }
811 }
812 bool has_new_rank = (my_new_global_rank != (-1));
813
814 // Calcule le min, le max et la somme sur tous les rangs du nombre de nouveaux.
815 // Deux cas peuvent se présenter:
816 // 1. Le min et le max sont égaux et supérieurs ou égaux à 2: Dans ce cas on créé
817 // un HybridParallelMng.
818 // 2. Le max vaut 1. Dans ce cas on créé un nouveau IParallelMng via le MpiParallelMng.
819 // Les rangs actuels pour lequels 'new_local_nb_rank' vaut 0 ne seront pas dans ce
820 // nouveau communicateur. Ce cas concerne aussi le cas où il ne reste plus qu'un
821 // seul rang à la fin.
822
823 Int32 min_new_local_nb_rank = -1;
824 Int32 max_new_local_nb_rank = -1;
825 Int32 sum_new_local_nb_rank = -1;
826 Int32 min_rank = A_NULL_RANK;
827 Int32 max_rank = A_NULL_RANK;
828 computeMinMaxSum(new_local_nb_rank,min_new_local_nb_rank,max_new_local_nb_rank,
829 sum_new_local_nb_rank,min_rank,max_rank);
830
831 m_trace->info() << "CREATE SUB_PARALLEL_MNG_REF new_local_nb_rank=" << new_local_nb_rank
832 << " min=" << min_new_local_nb_rank
833 << " max=" << max_new_local_nb_rank
834 << " sum=" << sum_new_local_nb_rank
835 << " new_global_rank=" << my_new_global_rank;
836
837 // S'il ne reste qu'un seul rang local, alors on construit uniquement un MpiParallelMng.
838 // Seul le PE qui a un nouveau rang est concerné et fait cela
839 if (max_new_local_nb_rank==1){
840 Integer nb_mpi_rank = m_mpi_parallel_mng->commSize();
841 // Il faut calculer les nouveaux rangs MPI.
842 // Si 'min_new_local_nb_rank' vaut 1, alors c'est simple car cela signifie qu'on garde
843 // tous les rangs MPI actuels (on fait l'équivalent d'un MPI_Comm_dup). Sinon, on
844 // récupère pour chaque rang MPI s'il sera dans le nouveau communicateur et on construit
845 // la liste des rangs conservés en fonction de cela.
846 // NOTE: dans tous les cas il faut faire attention qu'un seul thread utilise le
847 // 'm_mpi_parallel_mng'.
848 UniqueArray<Int32> kept_mpi_ranks;
850 bool do_mpi_call = false;
851 if (min_new_local_nb_rank==1){
852 if (has_new_rank){
853 do_mpi_call = true;
854 kept_mpi_ranks.resize(nb_mpi_rank);
855 for( Int32 x=0; x<nb_mpi_rank; ++x )
856 kept_mpi_ranks[x] = x;
857 }
858 }
859 else{
860 // Si je ne suis pas dans le nouveau communicateur, c'est le rang local 0 qui
861 // faut le 'gather'.
862 UniqueArray<Int16> gathered_ranks(nb_mpi_rank);
863 if (has_new_rank || m_local_rank==0){
864 do_mpi_call = true;
865 Int16 v = (has_new_rank) ? 1 : 0;
866 m_mpi_parallel_mng->allGather(ArrayView<Int16>(1,&v),gathered_ranks);
867 }
868 for( Int32 x=0; x<nb_mpi_rank; ++x )
869 if (gathered_ranks[x]==1)
870 kept_mpi_ranks.add(x);
871 }
872 if (do_mpi_call)
873 return m_mpi_parallel_mng->createSubParallelMngRef(kept_mpi_ranks);
874 else
875 return Ref<IParallelMng>();
876 }
877
878 if (max_new_local_nb_rank!=new_local_nb_rank)
879 ARCANE_FATAL("Not same number of new local ranks on every MPI processus: current={0} max={1}",
880 new_local_nb_rank,max_new_local_nb_rank);
881
882 if (max_new_local_nb_rank<2)
883 ARCANE_FATAL("number of local ranks is too low current={0} minimum=2",new_local_nb_rank);
884
885 // Met une barrière locale pour être sur que tout le monde attend ici.
886 m_thread_barrier->wait();
887
888 // NOTE: Le builder contient les parties communes aux IParallelMng créés. Il faut
889 // donc que ces derniers gardent une référence dessus sinon il sera détruit à la fin
890 // de cette méthode.
892
893 // Le rang 0 créé le builder
894 if (m_local_rank==0){
895 // Suppose qu'on à le même nombre de rangs MPI qu'avant donc on utilise
896 // le communicateur MPI qu'on a déjà.
897 MP::Communicator c = communicator();
898 MP::Communicator mc = machineCommunicator();
899 builder = m_sub_builder_factory->_createParallelMngBuilder(new_local_nb_rank, c, mc);
900 // Positionne le builder pour tout le monde
901 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder = builder;
902 }
903 // Attend pour être sur que tous les threads voit le bon builder.
904 m_thread_barrier->wait();
905
906 builder = m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder;
907 ARCANE_CHECK_POINTER(builder.get());
908
909 Ref<IParallelMng> new_parallel_mng;
910 if (my_new_local_rank>=0){
911 new_parallel_mng = builder->_createParallelMng(my_new_local_rank,traceMng());
912 }
913 m_thread_barrier->wait();
914
915 // Ici, tout le monde a créé son IParallelMng. On peut donc
916 // supprimer la référence au builder. Les IParallelMng créés gardent
917 // une référence au builder
918 if (m_local_rank==0){
919 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder.reset();
920 }
921 m_thread_barrier->wait();
922
923 return new_parallel_mng;
924}
925
926/*---------------------------------------------------------------------------*/
927/*---------------------------------------------------------------------------*/
928
931{
932 return m_utils_factory;
933}
934
935/*---------------------------------------------------------------------------*/
936/*---------------------------------------------------------------------------*/
937
938bool HybridParallelMng::
939_isAcceleratorAware() const
940{
941 return m_mpi_parallel_mng->_internalApi()->isAcceleratorAware();
942}
943
944/*---------------------------------------------------------------------------*/
945/*---------------------------------------------------------------------------*/
946
947} // End namespace Arcane::MessagePassing
948
949/*---------------------------------------------------------------------------*/
950/*---------------------------------------------------------------------------*/
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Liste des fonctions d'échange de message.
Exception lorsqu'un argument est invalide.
Vue modifiable d'un tableau d'un type T.
void resize(Int64 s)
Change le nombre d'éléments du tableau à s.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
Vue constante d'un tableau de type T.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
constexpr bool empty() const noexcept
true si le tableau est vide (size()==0)
Opérations pour accéder aux valeurs de variables d'un autre sous-domaine.
Interface du gestionnaire des entrées sorties.
Definition IIOMng.h:36
Interface d'une famille d'entités.
Definition IItemFamily.h:84
Échange d'informations entre processeurs.
virtual bool isAcceleratorAware() const =0
Indique si l'implémentation gère les accélérateurs.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual void computeMinMaxSum(char val, char &min_val, char &max_val, char &sum_val, Int32 &min_rank, Int32 &max_rank)=0
Calcule en une opération la somme, le min, le max d'une valeur.
Informations sur la réplication des sous-domaines en parallèle.
Informations sur la topologie d'allocation des coeurs de calcul.
Interface du gestionnaire de traces.
virtual TraceMessage info()=0
Flot pour un message d'information.
Envoie de valeurs sur différents processeurs.
Interface d'un service de synchronisation de variable.
Groupe d'entités de maillage.
Definition ItemGroup.h:49
Interface d'une file de messages avec les threads.
void machineBarrier() override
Méthode permettant de faire une barrière pour les sous-domaines du noeud de calcul.
void initializeWindowCreator() override
Méthode permettant d'initialiser le windowCreator spécifique à l'implémentation.
MemoryAllocationOptions machineShMemWinMemoryAllocator() override
Méthode permettant de récupérer un allocateur en mémoire partagée.
bool isMachineShMemWinAvailable() override
Méthode permettant de savoir si le mode mémoire partagée est supporté.
ConstArrayView< Int32 > machineRanks() override
Méthode permettant de récupérer les rangs des sous-domaines du noeud de calcul.
Ref< IContigMachineShMemWinBaseInternal > createContigMachineShMemWinBase(Int64 sizeof_segment, Int32 sizeof_type) override
Méthode permettant de créer une fenêtre mémoire sur le noeud.
Ref< IMachineShMemWinBaseInternal > createMachineShMemWinBase(Int64 sizeof_segment, Int32 sizeof_type) override
Méthode permettant de créer une fenêtre mémoire dynamique sur le noeud.
Implémentation de IRequestList pour HybridParallelMng.
void _wait(Parallel::eWaitType wait_type) override
Effectue l'attente ou le test.
Gestionnaire du parallélisme utilisant les threads.
IParallelMng * worldParallelMng() const override
Gestionnaire de parallélisme sur l'ensemble des ressources allouées.
void freeRequests(ArrayView< Request > requests) override
Libère les requêtes.
void printStats() override
Affiche des statistiques liées à ce gestionnaire du parallélisme.
IParallelExchanger * createExchanger() override
Retourne une interface pour transférer des messages entre processeurs.
void barrier() override
Effectue une barière.
MP::Communicator communicator() const override
Communicateur MPI associé à ce gestionnaire.
bool m_is_initialized
true si déjà initialisé
ISerializeMessage * createReceiveSerializer(Int32 rank) override
Créé un message non bloquant pour recevoir des données sérialisées du rang rank.
Ref< Parallel::IRequestList > createRequestListRef() override
Créé une liste de requêtes pour ce gestionnaire.
ITimerMng * timerMng() const override
Gestionnaire de timers.
void build() override
Construit l'instance.
void waitAllRequests(ArrayView< Request > requests) override
Bloque en attendant que les requêtes rvalues soient terminées.
void initialize() override
Initialise le gestionnaire du parallélisme.
IThreadMng * threadMng() const override
Gestionnaire de threads.
Int32 m_global_nb_rank
Nombre de rangs globaux.
MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
Int32 m_local_rank
Rang local du processeur actuel.
ITraceMng * traceMng() const override
Gestionnaire de traces.
IGetVariablesValuesParallelOperation * createGetVariablesValuesOperation() override
Retourne une opération pour récupérer les valeurs d'une variable sur les entités d'un autre sous-doma...
void * getMPICommunicator() override
Adresse du communicateur MPI associé à ce gestionnaire.
IParallelReplication * replication() const override
Informations sur la réplication.
Ref< IParallelMngUtilsFactory > _internalUtilsFactory() const override
Fabrique des fonctions utilitaires.
Ref< IParallelMng > createSubParallelMngRef(Int32ConstArrayView kept_ranks) override
Créé un nouveau gestionnaire de parallélisme pour un sous-ensemble des rangs.
MP::Communicator machineCommunicator() const override
Communicateur MPI issus du communicateur communicator() réunissant tous les processus du noeud de cal...
ITransferValuesParallelOperation * createTransferValuesOperation() override
Retourne une opération pour transférer des valeurs entre sous-domaine.
IParallelTopology * createTopology() override
Créé une instance contenant les infos sur la topologie des rangs de ce gestionnnaire.
void setReplication(IParallelReplication *v) override
Positionne les Informations sur la réplication.
IParallelMng * sequentialParallelMng() override
Retourne un gestionnaire de parallélisme séquentiel.
IVariableSynchronizer * createSynchronizer(IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
Int32 m_local_nb_rank
Nombre de rang locaux.
MessageId probe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
ISerializeMessage * createSendSerializer(Int32 rank) override
Créé un message non bloquant pour envoyer des données sérialisées au rang rank.
Int32 commRank() const override
Rang de cette instance dans le communicateur.
Int32 m_global_rank
Numéro du processeur actuel.
PointToPointMessageInfo buildMessage(Int32 dest, MP::eBlockingType is_blocking)
Construit un message avec pour destinataire dest.
void processPendingMessages() override
Envoie les messages de la liste qui ne l'ont pas encore été.
Integer waitMessages(Parallel::eWaitType wait_type) override
Attend que les messages aient terminé leur exécution.
void addMessage(ISerializeMessage *msg) override
Ajoute un message à la liste.
Interface du gestionnaire des échanges de messages.
Interface d'un message de sérialisation entre IMessagePassingMng.
Informations sur la source d'un message.
Informations pour envoyer/recevoir un message point à point.
void setEmiterRank(MessageRank rank)
Positionne le rang de l'émetteur du message.
Informations des buffers de réception.
Requête d'un message.
Definition Request.h:77
Classe de base d'une liste de requêtes.
IParallelMngInternal * _internalApi() override
API interne à Arcane.
Exception lorsqu'une fonction n'est pas implémentée.
Redirige la gestion des messages des sous-domaines suivant le type de l'argument.
IMessagePassingMng * messagePassingMng() const override
Gestionnaire de message de Arccore associé
ITimeStats * timeStats() const override
Gestionnaire de statistiques associé (peut être nul)
Classe de base d'une fabrique pour les fonctions utilitaires de IParallelMng.
Informations sur la réplication des sous-domaines en parallèle.
Référence à une instance.
InstanceType * get() const
Instance associée ou nullptr si aucune.
Implémentation d'un tampon pour la sérialisation.
Vue d'un tableau d'éléments de type T.
Definition Span.h:633
Gestionnaire de timer.
Definition TimerMng.h:39
Positionne la phase de l'action en cours d'exécution.
Definition Timer.h:128
Positionne une classe de message.
Vecteur 1D de données avec sémantique par valeur (style STL).
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
C void mpBroadcast(IMessagePassingMng *pm, Span< char > send_buf, Int32 rank)
@ WaitSome
Attend que tous les messages de la liste soient traités.
eBlockingType
Type indiquant si un message est bloquant ou non.
Implémentation de la concurrence.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
std::int8_t Int8
Type entier signé sur 8 bits.
ArrayView< Int64 > Int64ArrayView
Equivalent C d'un tableau à une dimension d'entiers 64 bits.
Definition UtilsTypes.h:451
Ref< TrueType > createRef(Args &&... args)
Créé une instance de type TrueType avec les arguments Args et retourne une référence dessus.
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
Array< Byte > ByteArray
Tableau dynamique à une dimension de caractères.
Definition UtilsTypes.h:121
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:482
std::int16_t Int16
Type entier signé sur 16 bits.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int32_t Int32
Type entier signé sur 32 bits.
Infos pour construire un HybridParallelMng.
Infos pour construire un SequentialParallelMng.