Arcane  4.1.12.0
Developer documentation
Loading...
Searching...
No Matches
HybridParallelMng.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2026 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* HybridParallelMng.cc (C) 2000-2026 */
9/* */
10/* Parallelism manager using a mix of MPI/Threads. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/parallel/mpithread/HybridParallelMng.h"
15
16#include "arcane/utils/NotImplementedException.h"
17#include "arcane/utils/NotSupportedException.h"
18#include "arcane/utils/FatalErrorException.h"
19#include "arcane/utils/NumericTypes.h"
20#include "arcane/utils/ArgumentException.h"
21#include "arcane/utils/IThreadBarrier.h"
22#include "arcane/utils/ITraceMng.h"
23
24#include "arcane/core/IIOMng.h"
25#include "arcane/core/Timer.h"
26#include "arcane/core/ISerializeMessageList.h"
27#include "arcane/core/IItemFamily.h"
28#include "arcane/core/IParallelTopology.h"
29#include "arcane/core/internal/ParallelMngInternal.h"
30#include "arcane/core/internal/SerializeMessage.h"
31#include "arcane/core/internal/MachineShMemWinMemoryAllocator.h"
32#include "arcane/core/parallel/IStat.h"
33
34#include "arcane/parallel/mpithread/HybridParallelDispatch.h"
35#include "arcane/parallel/mpithread/HybridMessageQueue.h"
36#include "arcane/parallel/mpithread/internal/HybridMachineShMemWinBaseInternalCreator.h"
37#include "arcane/parallel/mpithread/internal/HybridContigMachineShMemWinBaseInternal.h"
38#include "arcane/parallel/mpithread/internal/HybridMachineShMemWinBaseInternal.h"
39
40#include "arcane/parallel/mpi/MpiParallelMng.h"
41
42#include "arcane/impl/TimerMng.h"
43#include "arcane/impl/ParallelReplication.h"
44#include "arcane/impl/SequentialParallelMng.h"
45#include "arcane/impl/internal/ParallelMngUtilsFactoryBase.h"
46
48#include "arccore/message_passing/RequestListBase.h"
49#include "arccore/message_passing/internal/SerializeMessageList.h"
50
51/*---------------------------------------------------------------------------*/
52/*---------------------------------------------------------------------------*/
53
54namespace Arcane
55{
56extern "C++" IIOMng*
57arcaneCreateIOMng(IParallelMng* psm);
58}
59
61{
62
63/*---------------------------------------------------------------------------*/
64/*---------------------------------------------------------------------------*/
65
66// NOTE: This class is no longer used. It remains for reference
67// and will be removed later
68class HybridSerializeMessageList
70{
71 public:
72
73 class HybridSerializeMessageRequest
74 {
75 public:
76
77 HybridSerializeMessageRequest(ISerializeMessage* message, Request request)
78 : m_message(message)
79 , m_request(request)
80 {}
81
82 public:
83
84 ISerializeMessage* m_message = nullptr;
85 Request m_request;
86 };
87
88 public:
89
90 explicit HybridSerializeMessageList(HybridParallelMng* mpm)
91 : m_parallel_mng(mpm)
92 , m_trace(mpm->traceMng())
93 {
94 }
95
96 public:
97
98 void addMessage(ISerializeMessage* msg) override
99 {
100 m_messages_to_process.add(msg);
101 }
103 {
104 }
105
107 {
108 switch (wait_type) {
109 case Parallel::WaitAll:
110 // Currently, only the blocking mode is supported.
111 //m_parallel_mng->processMessages(m_messages_to_process);
112 _wait(Parallel::WaitAll);
113 m_messages_to_process.clear();
114 return (-1);
118 ARCANE_THROW(NotImplementedException, "WaitSomeNonBlocking");
119 }
120 return (-1);
121 }
122
123 private:
124
125 HybridParallelMng* m_parallel_mng;
126 ITraceMng* m_trace;
127 UniqueArray<ISerializeMessage*> m_messages_to_process;
128
129 void _waitAll();
130 void _wait(Parallel::eWaitType wait_mode);
131};
132
133/*---------------------------------------------------------------------------*/
134/*---------------------------------------------------------------------------*/
135
136void HybridSerializeMessageList::
137_wait(Parallel::eWaitType wait_mode)
138{
139 m_trace->info() << "BEGIN PROCESS MESSAGES";
140
141 // TODO: manage memory without using new.
142 ConstArrayView<ISerializeMessage*> messages = m_messages_to_process;
143 HybridMessageQueue* message_queue = m_parallel_mng->m_message_queue;
144 UniqueArray<Request> all_requests;
145 MessageTag HYBRID_MESSAGE_TAG(511);
146 for (ISerializeMessage* sm : messages) {
147 ISerializer* s = sm->serializer();
148 MessageRank orig(sm->source());
149 MessageRank dest(sm->destination());
150 PointToPointMessageInfo message_info(orig, dest, HYBRID_MESSAGE_TAG, Parallel::NonBlocking);
151 Request r;
152 if (sm->isSend())
153 r = message_queue->addSend(message_info, SendBufferInfo(s));
154 else
155 r = message_queue->addReceive(message_info, ReceiveBufferInfo(s));
156 all_requests.add(r);
157 }
158
159 if (wait_mode == Parallel::WaitAll)
160 message_queue->waitAll(all_requests);
161
162 for (ISerializeMessage* sm : messages)
163 sm->setFinished(true);
164}
165
166/*---------------------------------------------------------------------------*/
167/*---------------------------------------------------------------------------*/
168
169/*---------------------------------------------------------------------------*/
170/*---------------------------------------------------------------------------*/
171
173: public ParallelMngInternal
174{
175 public:
176
177 explicit Impl(HybridParallelMng* pm, HybridMachineShMemWinBaseInternalCreator* window_creator)
178 : ParallelMngInternal(pm)
179 , m_parallel_mng(pm)
180 , m_window_creator(window_creator)
181 , m_alloc(makeRef(new MachineShMemWinMemoryAllocator(pm)))
182 {}
183
184 ~Impl() override = default;
185
186 public:
187
189 {
190 return m_parallel_mng->m_mpi_parallel_mng->commRank() * m_parallel_mng->m_local_nb_rank;
191 }
193 {
194 return m_parallel_mng->m_local_nb_rank;
195 }
196
198 {
199 m_parallel_mng->traceMng()->debug() << "initializeWindowCreator() Hybrid";
200 m_window_creator->initializeMpiWindowCreator(m_parallel_mng->commRank(), m_parallel_mng->mpiParallelMng());
201 }
202
204 {
205 if (m_shmem_available == 1) {
206 return true;
207 }
208
209 if (m_shmem_available == 0) {
210 Ref<IParallelTopology> topo = m_parallel_mng->_internalUtilsFactory()->createTopology(m_parallel_mng);
211 if (topo->machineRanks().size() == m_window_creator->machineRanks().size()) {
212 m_shmem_available = 1;
213 return true;
214 }
215 // Issue with MPI. May occur if MPICH is compiled in ch3:sock mode.
216 m_shmem_available = 2;
217 return false;
218 }
219
220 return false;
221 }
222
224 {
225 return makeRef(m_window_creator->createWindow(m_parallel_mng->commRank(), sizeof_segment, sizeof_type, m_parallel_mng->mpiParallelMng()));
226 }
227
229 {
230 return makeRef(m_window_creator->createDynamicWindow(m_parallel_mng->commRank(), sizeof_segment, sizeof_type, m_parallel_mng->mpiParallelMng()));
231 }
232
234 {
235 return MemoryAllocationOptions{ m_alloc.get() };
236 }
237
239 {
240 return m_window_creator->machineRanks();
241 }
242
243 void machineBarrier() override
244 {
245 m_window_creator->machineBarrier(m_parallel_mng->commRank(), m_parallel_mng->mpiParallelMng());
246 }
247
248 private:
249
250 HybridParallelMng* m_parallel_mng;
253
254 // 0 = Not initialized attribute
255 // 1 = Shared memory available
256 // 2 = Shared memory not available
257 Int8 m_shmem_available = 0;
258};
259
260/*---------------------------------------------------------------------------*/
261/*---------------------------------------------------------------------------*/
262
263/*---------------------------------------------------------------------------*/
264/*---------------------------------------------------------------------------*/
265
266HybridParallelMng::
267HybridParallelMng(const HybridParallelMngBuildInfo& bi)
268: ParallelMngDispatcher(ParallelMngDispatcherBuildInfo(bi.local_rank, bi.local_nb_rank))
269, m_trace(bi.trace_mng)
270, m_thread_mng(bi.thread_mng)
271, m_world_parallel_mng(bi.world_parallel_mng)
272, m_io_mng(nullptr)
273, m_timer_mng(nullptr)
274, m_replication(new ParallelReplication())
275, m_message_queue(new HybridMessageQueue(bi.message_queue, bi.mpi_parallel_mng, bi.local_nb_rank))
276, m_is_initialized(false)
277, m_stat(Parallel::createDefaultStat())
278, m_thread_barrier(bi.thread_barrier)
279, m_mpi_parallel_mng(bi.mpi_parallel_mng)
280, m_all_dispatchers(bi.all_dispatchers)
281, m_sub_builder_factory(bi.sub_builder_factory)
282, m_parent_container_ref(bi.container)
283, m_utils_factory(createRef<ParallelMngUtilsFactoryBase>())
284, m_parallel_mng_internal(new Impl(this, bi.window_creator))
285{
286 if (!m_world_parallel_mng)
287 m_world_parallel_mng = this;
288
289 // TODO: verify that all other HybridParallelMng have the same
290 // number of local ranks (m_local_nb_rank)
291 m_local_rank = bi.local_rank;
292 m_local_nb_rank = bi.local_nb_rank;
293
294 Int32 mpi_rank = m_mpi_parallel_mng->commRank();
295 Int32 mpi_size = m_mpi_parallel_mng->commSize();
296
299
300 m_is_parallel = m_global_nb_rank != 1;
301}
302
303/*---------------------------------------------------------------------------*/
304/*---------------------------------------------------------------------------*/
305
306HybridParallelMng::
307~HybridParallelMng()
308{
309 m_sequential_parallel_mng.reset();
310 delete m_replication;
311 delete m_io_mng;
312 delete m_message_queue;
313 delete m_timer_mng;
314 delete m_stat;
315 delete m_mpi_parallel_mng;
316 delete m_parallel_mng_internal;
317}
318
319/*---------------------------------------------------------------------------*/
320/*---------------------------------------------------------------------------*/
321
322namespace
323{
324 // Class to create the different dispatchers
325 class DispatchCreator
326 {
327 public:
328
329 DispatchCreator(ITraceMng* tm, HybridParallelMng* mpm, HybridMessageQueue* message_queue, MpiThreadAllDispatcher* all_dispatchers)
330 : m_tm(tm)
331 , m_mpm(mpm)
332 , m_message_queue(message_queue)
333 , m_all_dispatchers(all_dispatchers)
334 {}
335
336 public:
337
338 template <typename DataType> HybridParallelDispatch<DataType>*
339 create()
340 {
341 HybridMessageQueue* tmq = m_message_queue;
342 MpiThreadAllDispatcher* ad = m_all_dispatchers;
343 auto field = ad->instance((DataType*)nullptr).view();
344 return new HybridParallelDispatch<DataType>(m_tm, m_mpm, tmq, field);
345 }
346
347 ITraceMng* m_tm;
348 HybridParallelMng* m_mpm;
349 HybridMessageQueue* m_message_queue;
350 MpiThreadAllDispatcher* m_all_dispatchers;
351 };
352} // namespace
353
354/*---------------------------------------------------------------------------*/
355/*---------------------------------------------------------------------------*/
356
358build()
359{
360 ITraceMng* tm = traceMng();
361 tm->info() << "Initialise HybridParallelMng"
362 << " global_rank=" << m_global_rank
363 << " local_rank=" << m_local_rank
364 << " mpi_rank=" << m_mpi_parallel_mng->commRank();
365
366 m_timer_mng = new TimerMng(tm);
367
368 // Created the associated sequential manager.
369 {
371 bi.setTraceMng(traceMng());
372 bi.setCommunicator(communicator());
373 bi.setThreadMng(threadMng());
374 m_sequential_parallel_mng = arcaneCreateSequentialParallelMngRef(bi);
375 }
376
377 DispatchCreator creator(m_trace, this, m_message_queue, m_all_dispatchers);
378 this->createDispatchers(creator);
379 m_io_mng = arcaneCreateIOMng(this);
380
381 m_parallel_mng_internal->initializeWindowCreator();
382}
383
384/*---------------------------------------------------------------------------*/
385/*---------------------------------------------------------------------------*/
386
387/*----------------------------------------------------------------------------*/
388/*---------------------------------------------------------------------------*/
389
392{
393 Trace::Setter mci(m_trace, "Thread");
394 if (m_is_initialized) {
395 m_trace->warning() << "HybridParallelMng already initialized";
396 return;
397 }
398
399 m_is_initialized = true;
400}
401
402/*---------------------------------------------------------------------------*/
403/*---------------------------------------------------------------------------*/
404
405SerializeBuffer* HybridParallelMng::
406_castSerializer(ISerializer* serializer)
407{
408 auto sbuf = dynamic_cast<SerializeBuffer*>(serializer);
409 if (!sbuf)
410 ARCANE_THROW(ArgumentException, "can not cast 'ISerializer' to 'SerializeBuffer'");
411 return sbuf;
412}
413
414/*---------------------------------------------------------------------------*/
415/*---------------------------------------------------------------------------*/
416
419{
420 return m_utils_factory->createGetVariablesValuesOperation(this)._release();
421}
422
425{
426 return m_utils_factory->createTransferValuesOperation(this)._release();
427}
428
431{
432 return m_utils_factory->createExchanger(this)._release();
433}
434
435/*---------------------------------------------------------------------------*/
436/*---------------------------------------------------------------------------*/
437
438/*---------------------------------------------------------------------------*/
439/*---------------------------------------------------------------------------*/
440
441void HybridParallelMng::
442sendSerializer(ISerializer* s, Int32 rank)
443{
444 auto p2p_message = buildMessage(rank, Parallel::NonBlocking);
445 Request r = m_message_queue->addSend(p2p_message, s);
446 m_message_queue->waitAll(ArrayView<Request>(1, &r));
447}
448
449/*---------------------------------------------------------------------------*/
450/*---------------------------------------------------------------------------*/
451
452auto HybridParallelMng::
453sendSerializer(ISerializer* s, Int32 rank, ByteArray& bytes) -> Request
454{
455 ARCANE_UNUSED(bytes);
456 auto p2p_message = buildMessage(rank, Parallel::NonBlocking);
457 return m_message_queue->addSend(p2p_message, s);
458}
459
460/*---------------------------------------------------------------------------*/
461/*---------------------------------------------------------------------------*/
462
465{
466 return m_utils_factory->createSendSerializeMessage(this, rank)._release();
467}
468
469/*---------------------------------------------------------------------------*/
470/*---------------------------------------------------------------------------*/
471
472void HybridParallelMng::
473broadcastSerializer(ISerializer* values, Int32 rank)
474{
475 Timer::Phase tphase(timeStats(), TP_Communication);
476 SerializeBuffer* sbuf = _castSerializer(values);
477
478 bool is_broadcaster = (rank == commRank());
479
480 // Send in two phases. First send the number of elements
481 // then send the elements.
482 // TODO: it would be possible to do it in one go for messages
483 // not exceeding a certain size.
484
486 if (is_broadcaster) {
487 Int64 total_size = sbuf->totalSize();
488 Span<Byte> bytes = sbuf->globalBuffer();
489 this->broadcast(Int64ArrayView(1, &total_size), rank);
490 mpBroadcast(mpm, bytes, rank);
491 }
492 else {
493 Int64 total_size = 0;
494 this->broadcast(Int64ArrayView(1, &total_size), rank);
495 sbuf->preallocate(total_size);
496 Span<Byte> bytes = sbuf->globalBuffer();
497 mpBroadcast(mpm, bytes, rank);
498 sbuf->setFromSizes();
499 }
500}
501
502/*---------------------------------------------------------------------------*/
503/*---------------------------------------------------------------------------*/
504
505void HybridParallelMng::
506recvSerializer(ISerializer* s, Int32 rank)
507{
508 auto p2p_message = buildMessage(rank, Parallel::NonBlocking);
509 Request r = m_message_queue->addReceive(p2p_message, ReceiveBufferInfo(s));
510 m_message_queue->waitAll(ArrayView<Request>(1, &r));
511}
512
513/*---------------------------------------------------------------------------*/
514/*---------------------------------------------------------------------------*/
515
518{
519 return m_utils_factory->createReceiveSerializeMessage(this, rank)._release();
520}
521
522/*---------------------------------------------------------------------------*/
523/*---------------------------------------------------------------------------*/
524
527{
528 ARCANE_UNUSED(requests);
529 throw NotImplementedException(A_FUNCINFO);
530}
531
532/*---------------------------------------------------------------------------*/
533/*---------------------------------------------------------------------------*/
534
536probe(const PointToPointMessageInfo& message)
537{
538 PointToPointMessageInfo p2p_message(message);
540 return m_message_queue->probe(p2p_message);
541}
542
543/*---------------------------------------------------------------------------*/
544/*---------------------------------------------------------------------------*/
545
548{
549 PointToPointMessageInfo p2p_message(message);
551 return m_message_queue->legacyProbe(p2p_message);
552}
553
554/*---------------------------------------------------------------------------*/
555/*---------------------------------------------------------------------------*/
556
557Request HybridParallelMng::
558sendSerializer(const ISerializer* s, const PointToPointMessageInfo& message)
559{
560 auto p2p_message = buildMessage(message);
561 return m_message_queue->addSend(p2p_message, s);
562}
563
564/*---------------------------------------------------------------------------*/
565/*---------------------------------------------------------------------------*/
566
567Request HybridParallelMng::
568receiveSerializer(ISerializer* s, const PointToPointMessageInfo& message)
569{
570 auto p2p_message = buildMessage(message);
571 return m_message_queue->addReceive(p2p_message, ReceiveBufferInfo(s));
572}
573
574/*---------------------------------------------------------------------------*/
575/*---------------------------------------------------------------------------*/
576
579{
580 if (m_stat)
581 m_stat->print(m_trace);
582}
583
584/*---------------------------------------------------------------------------*/
585/*---------------------------------------------------------------------------*/
586
588barrier()
589{
590 m_thread_barrier->wait();
591 if (m_local_rank == 0)
592 m_mpi_parallel_mng->barrier();
593 m_thread_barrier->wait();
594}
595
596/*---------------------------------------------------------------------------*/
597/*---------------------------------------------------------------------------*/
598
599ISerializeMessageList* HybridParallelMng::
600_createSerializeMessageList()
601{
602 auto* x = new MP::internal::SerializeMessageList(messagePassingMng());
603 x->setAllowAnyRankReceive(false);
604 return x;
605}
606
607/*---------------------------------------------------------------------------*/
608/*---------------------------------------------------------------------------*/
609
612{
613 return m_utils_factory->createSynchronizer(this, family)._release();
614}
615
616/*---------------------------------------------------------------------------*/
617/*---------------------------------------------------------------------------*/
618
620createSynchronizer(const ItemGroup& group)
621{
622 return m_utils_factory->createSynchronizer(this, group)._release();
623}
624
625/*---------------------------------------------------------------------------*/
626/*---------------------------------------------------------------------------*/
627
630{
631 return m_utils_factory->createTopology(this)._release();
632}
633
634/*---------------------------------------------------------------------------*/
635/*---------------------------------------------------------------------------*/
636
638replication() const
639{
640 return m_replication;
641}
642
643/*---------------------------------------------------------------------------*/
644/*---------------------------------------------------------------------------*/
645
648{
649 delete m_replication;
650 m_replication = v;
651}
652
653/*---------------------------------------------------------------------------*/
654/*---------------------------------------------------------------------------*/
655
658{
659 return m_sequential_parallel_mng.get();
660}
661
662/*---------------------------------------------------------------------------*/
663/*---------------------------------------------------------------------------*/
664
665Ref<IParallelMng> HybridParallelMng::
666sequentialParallelMngRef()
667{
668 return m_sequential_parallel_mng;
669}
670
671/*---------------------------------------------------------------------------*/
672/*---------------------------------------------------------------------------*/
673
679{
681
682 public:
683
684 RequestList(HybridParallelMng* pm)
685 : m_parallel_mng(pm)
686 , m_message_queue(pm->m_message_queue)
687 , m_local_rank(m_parallel_mng->localRank())
688 {}
689
690 public:
691
692 void _wait(Parallel::eWaitType wait_type) override
693 {
694 switch (wait_type) {
695 case Parallel::WaitAll:
696 m_parallel_mng->m_message_queue->waitAll(_requests());
697 break;
699 m_message_queue->waitSome(m_local_rank, _requests(), _requestsDone(), false);
700 break;
702 m_message_queue->waitSome(m_local_rank, _requests(), _requestsDone(), true);
703 }
704 }
705
706 private:
707
708 HybridParallelMng* m_parallel_mng;
709 HybridMessageQueue* m_message_queue;
711};
712
713/*---------------------------------------------------------------------------*/
714/*---------------------------------------------------------------------------*/
715
722
723/*---------------------------------------------------------------------------*/
724/*---------------------------------------------------------------------------*/
725
728{
729 m_message_queue->waitAll(requests);
730}
731
732/*---------------------------------------------------------------------------*/
733/*---------------------------------------------------------------------------*/
734
737{
738 return m_mpi_parallel_mng->getMPICommunicator();
739}
740
741/*---------------------------------------------------------------------------*/
742/*---------------------------------------------------------------------------*/
743
744MP::Communicator HybridParallelMng::
745communicator() const
746{
747 return m_mpi_parallel_mng->communicator();
748}
749
750/*---------------------------------------------------------------------------*/
751/*---------------------------------------------------------------------------*/
752
753MP::Communicator HybridParallelMng::
755{
756 return m_mpi_parallel_mng->machineCommunicator();
757}
758
759/*---------------------------------------------------------------------------*/
760/*---------------------------------------------------------------------------*/
761
764{
765 PointToPointMessageInfo p2p_message(message);
766 p2p_message.setEmiterRank(MessageRank(m_global_rank));
767 return p2p_message;
768}
769
770/*---------------------------------------------------------------------------*/
771/*---------------------------------------------------------------------------*/
772
774buildMessage(Int32 dest, Parallel::eBlockingType blocking_mode)
775{
776 return buildMessage({ MessageRank(dest), blocking_mode });
777}
778
779/*---------------------------------------------------------------------------*/
780/*---------------------------------------------------------------------------*/
781
782IParallelMng* HybridParallelMng::
783_createSubParallelMng(Int32ConstArrayView kept_ranks)
784{
785 ARCANE_UNUSED(kept_ranks);
786 ARCANE_THROW(NotSupportedException, "Use createSubParallelMngRef() instead");
787}
788
789/*---------------------------------------------------------------------------*/
790/*---------------------------------------------------------------------------*/
791
794{
795 // ATTENTION: This method is called simultaneously by all threads
796 // sharing this HybridParallelMng.
797
798 if (kept_ranks.empty())
799 ARCANE_FATAL("kept_ranks is empty");
800 ARCANE_CHECK_POINTER(m_sub_builder_factory);
801
802 m_trace->info() << "CREATE SUB_PARALLEL_MNG_REF";
803
804 /*
805 There are several possibilities:
806 1. We just reduce the number of ranks in shared memory for each
807 MPI process - we create a HybridParallelMng
808 2. We only keep the master rank of each MPI process -> we create an MpiParallelMng.
809 3. We only keep the ranks of the same process -> we create a SharedMemoryParallelMng
810 4. We only keep a single rank: we create an MpiSequentialParallelMng.
811 */
812 // For now, we only support cases 1 and 2.
813 Int32 nb_kept_rank = kept_ranks.size();
814
815 // Determines the new number of local ranks per MPI rank.
816
817 // Checks if I am in the list of kept ranks and, if so,
818 // determines my rank in the created IParallelMng
819 Int32 first_global_rank_in_this_mpi = m_global_rank - m_local_rank;
820 Int32 last_global_rank_in_this_mpi = first_global_rank_in_this_mpi + m_local_nb_rank - 1;
821 // My new global rank. Negative if I am not in the new communicator
822 Int32 my_new_global_rank = (-1);
823 Int32 new_local_nb_rank = 0;
824 Int32 my_new_local_rank = (-1);
825 for (Integer i = 0; i < nb_kept_rank; ++i) {
826 Int32 kept_rank = kept_ranks[i];
827 if (kept_rank >= first_global_rank_in_this_mpi && kept_rank < last_global_rank_in_this_mpi)
828 ++new_local_nb_rank;
829 if (kept_rank == m_global_rank) {
830 my_new_global_rank = i;
831 my_new_local_rank = new_local_nb_rank - 1;
832 }
833 }
834 bool has_new_rank = (my_new_global_rank != (-1));
835
836 // Calculates the min, max, and sum of the new number.
837 // Two cases can occur:
838 // 1. The min and max are equal and greater than or equal to 2: In this case, we create
839 // a HybridParallelMng.
840 // 2. The max is 1. In this case, we create a new IParallelMng via the MpiParallelMng.
841 // The current ranks for which 'new_local_nb_rank' is 0 will not be in this
842 // new communicator. This case also applies when only one rank remains
843 // at the end.
844
845 Int32 min_new_local_nb_rank = -1;
846 Int32 max_new_local_nb_rank = -1;
847 Int32 sum_new_local_nb_rank = -1;
848 Int32 min_rank = A_NULL_RANK;
849 Int32 max_rank = A_NULL_RANK;
850 computeMinMaxSum(new_local_nb_rank, min_new_local_nb_rank, max_new_local_nb_rank,
851 sum_new_local_nb_rank, min_rank, max_rank);
852
853 m_trace->info() << "CREATE SUB_PARALLEL_MNG_REF new_local_nb_rank=" << new_local_nb_rank
854 << " min=" << min_new_local_nb_rank
855 << " max=" << max_new_local_nb_rank
856 << " sum=" << sum_new_local_nb_rank
857 << " new_global_rank=" << my_new_global_rank;
858
859 // If only one local rank remains, then we only build an MpiParallelMng.
860 // Only the PE that has a new rank is concerned and does this
861 if (max_new_local_nb_rank == 1) {
862 Integer nb_mpi_rank = m_mpi_parallel_mng->commSize();
863 // We must calculate the new MPI ranks.
864 // If 'min_new_local_nb_rank' is 1, it's simple because it means we keep
865 // all current MPI ranks (equivalent to an MPI_Comm_dup). Otherwise, we
866 // retrieve for each MPI rank whether it will be in the new communicator and
867 // build the list of kept ranks based on that.
868 // NOTE: in all cases, we must ensure that only one thread uses the
869 // 'm_mpi_parallel_mng'.
870 UniqueArray<Int32> kept_mpi_ranks;
872 bool do_mpi_call = false;
873 if (min_new_local_nb_rank == 1) {
874 if (has_new_rank) {
875 do_mpi_call = true;
876 kept_mpi_ranks.resize(nb_mpi_rank);
877 for (Int32 x = 0; x < nb_mpi_rank; ++x)
878 kept_mpi_ranks[x] = x;
879 }
880 }
881 else {
882 // If I am not in the new communicator, local rank 0 must
883 // 'gather'.
884 UniqueArray<Int16> gathered_ranks(nb_mpi_rank);
885 if (has_new_rank || m_local_rank == 0) {
886 do_mpi_call = true;
887 Int16 v = (has_new_rank) ? 1 : 0;
888 m_mpi_parallel_mng->allGather(ArrayView<Int16>(1, &v), gathered_ranks);
889 }
890 for (Int32 x = 0; x < nb_mpi_rank; ++x)
891 if (gathered_ranks[x] == 1)
892 kept_mpi_ranks.add(x);
893 }
894 if (do_mpi_call)
895 return m_mpi_parallel_mng->createSubParallelMngRef(kept_mpi_ranks);
896 else
897 return Ref<IParallelMng>();
898 }
899
900 if (max_new_local_nb_rank != new_local_nb_rank)
901 ARCANE_FATAL("Not same number of new local ranks on every MPI processus: current={0} max={1}",
902 new_local_nb_rank, max_new_local_nb_rank);
903
904 if (max_new_local_nb_rank < 2)
905 ARCANE_FATAL("number of local ranks is too low current={0} minimum=2", new_local_nb_rank);
906
907 // Wait a local barrier to ensure everyone waits here.
908 m_thread_barrier->wait();
909
910 // NOTE: The builder contains the common parts of the created IParallelMngs. It must
911 // therefore be referenced by them, otherwise it will be destroyed at the end
912 // of this method.
914
915 // Rank 0 creates the builder
916 if (m_local_rank == 0) {
917 // Assuming we have the same number of MPI ranks as before, we use
918 // the MPI communicator we already have.
919 MP::Communicator c = communicator();
920 MP::Communicator mc = machineCommunicator();
921 builder = m_sub_builder_factory->_createParallelMngBuilder(new_local_nb_rank, c, mc);
922 // Position the builder for everyone
923 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder = builder;
924 }
925 // Wait to ensure all threads see the correct builder.
926 m_thread_barrier->wait();
927
928 builder = m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder;
929 ARCANE_CHECK_POINTER(builder.get());
930
931 Ref<IParallelMng> new_parallel_mng;
932 if (my_new_local_rank >= 0) {
933 new_parallel_mng = builder->_createParallelMng(my_new_local_rank, traceMng());
934 }
935 m_thread_barrier->wait();
936
937 // Here, everyone has created their IParallelMng. We can therefore
938 // release the reference to the builder. The created IParallelMngs keep
939 // a reference to the builder
940 if (m_local_rank == 0) {
941 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder.reset();
942 }
943 m_thread_barrier->wait();
944
945 return new_parallel_mng;
946}
947
948/*---------------------------------------------------------------------------*/
949/*---------------------------------------------------------------------------*/
950
953{
954 return m_utils_factory;
955}
956
957/*---------------------------------------------------------------------------*/
958/*---------------------------------------------------------------------------*/
959
960bool HybridParallelMng::
961_isAcceleratorAware() const
962{
963 return m_mpi_parallel_mng->_internalApi()->isAcceleratorAware();
964}
965
966/*---------------------------------------------------------------------------*/
967/*---------------------------------------------------------------------------*/
968
969} // End namespace Arcane::MessagePassing
970
971/*---------------------------------------------------------------------------*/
972/*---------------------------------------------------------------------------*/
#define ARCANE_CHECK_POINTER(ptr)
Macro returning the pointer ptr if it is not null or throwing an exception if it is null.
#define ARCANE_THROW(exception_class,...)
Macro for throwing an exception with formatting.
#define ARCANE_FATAL(...)
Macro throwing a FatalErrorException.
Brief list of message exchange functions.
Modifiable view of an array of type T.
void resize(Int64 s)
Changes the number of elements in the array to s.
void add(ConstReferenceType val)
Adds element val to the end of the array.
Constant view of an array of type T.
constexpr Integer size() const noexcept
Number of elements in the array.
constexpr bool empty() const noexcept
true if the array is empty (size()==0)
Operations to access variable values from another subdomain.
Interface of the input/output manager.
Definition IIOMng.h:37
Interface of an entity family.
Definition IItemFamily.h:83
Information exchange between processors.
virtual bool isAcceleratorAware() const =0
Indicates if the implementation handles accelerators.
Interface of the parallelism manager for a subdomain.
virtual void computeMinMaxSum(char val, char &min_val, char &max_val, char &sum_val, Int32 &min_rank, Int32 &max_rank)=0
Calculates the sum, min, and max of a value in one operation.
Brief information on parallel subdomain replication.
Information on the computing core allocation topology.
virtual TraceMessage info()=0
Stream for an information message.
Sends values across different processors.
Interface of a variable synchronization service.
Mesh entity group.
Definition ItemGroup.h:51
Interface for a message queue with threads.
void machineBarrier() override
Method allowing a barrier for the sub-domains of the computing node.
void initializeWindowCreator() override
Method allowing the initialization of the windowCreator specific to the implementation.
MemoryAllocationOptions machineShMemWinMemoryAllocator() override
Method allowing retrieval of a shared memory allocator.
bool isMachineShMemWinAvailable() override
Method allowing to know if shared memory mode is supported.
ConstArrayView< Int32 > machineRanks() override
Method allowing retrieval of the ranks of the sub-domains of the computing node.
Ref< IContigMachineShMemWinBaseInternal > createContigMachineShMemWinBase(Int64 sizeof_segment, Int32 sizeof_type) override
Method allowing the creation of a memory window on the node.
Ref< IMachineShMemWinBaseInternal > createMachineShMemWinBase(Int64 sizeof_segment, Int32 sizeof_type) override
Method allowing the creation of a dynamic memory window on the node.
Implementation of IRequestList for HybridParallelMng.
void _wait(Parallel::eWaitType wait_type) override
Performs the wait or test.
Thread-based parallelism manager.
IParallelMng * worldParallelMng() const override
Parallelism manager over all allocated resources.
void freeRequests(ArrayView< Request > requests) override
Frees the requests.
void printStats() override
Prints statistics related to this parallelism manager.
IParallelExchanger * createExchanger() override
Returns an interface for transferring messages between processors.
void barrier() override
Performs a barrier.
MP::Communicator communicator() const override
MPI communicator associated with this manager.
bool m_is_initialized
true if already initialized
ISerializeMessage * createReceiveSerializer(Int32 rank) override
Creates a non-blocking message to receive serialized data from rank rank.
Ref< Parallel::IRequestList > createRequestListRef() override
Creates a request list for this manager.
ITimerMng * timerMng() const override
Timer manager.
void build() override
Constructs the instance.
void waitAllRequests(ArrayView< Request > requests) override
Blocks while waiting for the rvalues requests to complete.
void initialize() override
Initializes the parallelism manager.
IThreadMng * threadMng() const override
Thread manager.
Int32 m_global_nb_rank
Total number of global ranks.
MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message) override
Probes if messages are available.
Int32 m_local_rank
Local rank of the current processor.
ITraceMng * traceMng() const override
Trace manager.
IGetVariablesValuesParallelOperation * createGetVariablesValuesOperation() override
Returns an operation to retrieve the values of a variable on the entities of another subdomain.
void * getMPICommunicator() override
Address of the MPI communicator associated with this manager.
IParallelReplication * replication() const override
Replication information.
Ref< IParallelMngUtilsFactory > _internalUtilsFactory() const override
Factory for utility functions.
Ref< IParallelMng > createSubParallelMngRef(Int32ConstArrayView kept_ranks) override
Creates a new parallelism manager for a subset of ranks.
MP::Communicator machineCommunicator() const override
MPI communicator derived from the communicator communicator() gathering all processes of the compute ...
ITransferValuesParallelOperation * createTransferValuesOperation() override
Returns an operation to transfer values between subdomains.
IParallelTopology * createTopology() override
Creates an instance containing information about the rank topology of this manager.
void setReplication(IParallelReplication *v) override
Sets the Replication Information.
IParallelMng * sequentialParallelMng() override
Returns a sequential parallelism manager.
IVariableSynchronizer * createSynchronizer(IItemFamily *family) override
Returns an interface for synchronizing variables on the group of the family.
Int32 m_local_nb_rank
Number of local ranks.
MessageId probe(const PointToPointMessageInfo &message) override
Probes if messages are available.
ISerializeMessage * createSendSerializer(Int32 rank) override
Creates a non-blocking message to send serialized data to rank rank.
Int32 commRank() const override
Rank of this instance in the communicator.
Int32 m_global_rank
Current processor number.
PointToPointMessageInfo buildMessage(Int32 dest, MP::eBlockingType is_blocking)
Constructs a message with destination dest.
void processPendingMessages() override
Sends the messages in the list that have not yet been sent.
Integer waitMessages(Parallel::eWaitType wait_type) override
Waits until the messages have finished execution.
void addMessage(ISerializeMessage *msg) override
Adds a message to the list.
Interface of the message passing manager.
Information about the source of a message.
Information for sending/receiving a point-to-point message.
void setEmiterRank(MessageRank rank)
Positions the message sender rank.
IParallelMngInternal * _internalApi() override
Internal Arcane API.
Redirects the message management of sub-domains according to the argument type.
IMessagePassingMng * messagePassingMng() const override
Associated Arccore message passing manager.
ITimeStats * timeStats() const override
Associated statistics manager (can be null).
Base class of a factory for IParallelMng utility functions.
Brief information on parallel subdomain replication.
InstanceType * get() const
Associated instance or nullptr if none.
Reference to an instance.
Implementation of a buffer for serialization.
View of an array of elements of type T.
Definition Span.h:635
Timer manager.
Definition TimerMng.h:40
Positions the phase of the currently executing action.
Definition Timer.h:142
1D data vector with value semantics (STL style).
Declarations of types and methods used by message exchange mechanisms.
C void mpBroadcast(IMessagePassingMng *pm, Span< char > send_buf, Int32 rank)
@ WaitSome
Wait until all messages in the list are processed.
eBlockingType
Type indicating whether a message is blocking or not.
Concurrency implementation.
-- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature --
std::int8_t Int8
Signed integer type of 8 bits.
ArrayView< Int64 > Int64ArrayView
C equivalent of a 1D array of 64-bit integers.
Definition UtilsTypes.h:451
Ref< TrueType > createRef(Args &&... args)
Creates an instance of type TrueType with arguments Args and returns a reference to it.
std::int64_t Int64
Signed integer type of 64 bits.
Int32 Integer
Type representing an integer.
Array< Byte > ByteArray
Dynamic one-dimensional array of characters.
Definition UtilsTypes.h:121
ConstArrayView< Int32 > Int32ConstArrayView
C equivalent of a 1D array of 32-bit integers.
Definition UtilsTypes.h:482
std::int16_t Int16
Signed integer type of 16 bits.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Creates a reference on a pointer.
std::int32_t Int32
Signed integer type of 32 bits.
Info for constructing a HybridParallelMng.
Information to construct a SequentialParallelMng.