Arcane  4.1.12.0
Developer documentation
Loading...
Searching...
No Matches
MpiParallelMng.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2026 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* MpiParallelMng.cc (C) 2000-2026 */
9/* */
10/* Parallelism manager using MPI. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/utils/Collection.h"
15#include "arcane/utils/Enumerator.h"
16#include "arcane/utils/ScopedPtr.h"
17#include "arcane/utils/PlatformUtils.h"
18#include "arcane/utils/TimeoutException.h"
19#include "arcane/utils/NotImplementedException.h"
20#include "arcane/utils/ArgumentException.h"
21#include "arcane/utils/ITraceMng.h"
22#include "arcane/utils/ValueConvert.h"
23#include "arcane/utils/Exception.h"
24#include "arcane/utils/HPReal.h"
25
26#include "arcane/core/IIOMng.h"
27#include "arcane/core/Timer.h"
28#include "arcane/core/IItemFamily.h"
29#include "arcane/core/IParallelTopology.h"
30#include "arcane/core/parallel/IStat.h"
31#include "arcane/core/internal/SerializeMessage.h"
32#include "arcane/core/internal/ParallelMngInternal.h"
33#include "arcane/core/internal/MachineShMemWinMemoryAllocator.h"
34
35#include "arcane/parallel/mpi/MpiParallelMng.h"
36#include "arcane/parallel/mpi/MpiParallelDispatch.h"
37#include "arcane/parallel/mpi/MpiTimerMng.h"
38#include "arcane/parallel/mpi/MpiSerializeMessage.h"
39#include "arcane/parallel/mpi/MpiParallelNonBlockingCollective.h"
40#include "arcane/parallel/mpi/MpiDatatype.h"
41#include "arcane/parallel/mpi/IVariableSynchronizerMpiCommunicator.h"
42
43#include "arcane/impl/ParallelReplication.h"
44#include "arcane/impl/SequentialParallelMng.h"
45#include "arcane/impl/internal/ParallelMngUtilsFactoryBase.h"
46#include "arcane/impl/internal/VariableSynchronizer.h"
47
48#include "arccore/message_passing_mpi/MpiMessagePassingMng.h"
49#include "arccore/message_passing_mpi/internal/MpiSerializeDispatcher.h"
50#include "arccore/message_passing_mpi/internal/MpiRequestList.h"
51#include "arccore/message_passing_mpi/internal/MpiAdapter.h"
52#include "arccore/message_passing_mpi/internal/MpiLock.h"
53#include "arccore/message_passing_mpi/internal/MpiMachineShMemWinBaseInternalCreator.h"
54#include "arccore/message_passing_mpi/internal/MpiContigMachineShMemWinBaseInternal.h"
55#include "arccore/message_passing_mpi/internal/MpiMachineShMemWinBaseInternal.h"
56#include "arccore/message_passing/Dispatchers.h"
58#include "arccore/message_passing/internal/SerializeMessageList.h"
59
60#include "arcane_packages.h"
61
62//#define ARCANE_TRACE_MPI
63
64/*---------------------------------------------------------------------------*/
65/*---------------------------------------------------------------------------*/
66
67namespace Arcane
68{
69using namespace Arcane::MessagePassing;
70using namespace Arcane::MessagePassing::Mpi;
72
73/*---------------------------------------------------------------------------*/
74/*---------------------------------------------------------------------------*/
75
76extern "C++" IIOMng*
77arcaneCreateIOMng(IParallelMng* psm);
78
79#if defined(ARCANE_HAS_MPI_NEIGHBOR)
80// Defined in MpiNeighborVariableSynchronizeDispatcher
82arcaneCreateMpiNeighborVariableSynchronizerFactory(MpiParallelMng* mpi_pm,
83 Ref<IVariableSynchronizerMpiCommunicator> synchronizer_communicator);
84#endif
86arcaneCreateMpiBlockVariableSynchronizerFactory(MpiParallelMng* mpi_pm, Int32 block_size, Int32 nb_sequence);
88arcaneCreateMpiVariableSynchronizerFactory(MpiParallelMng* mpi_pm);
90arcaneCreateMpiDirectSendrecvVariableSynchronizerFactory(MpiParallelMng* mpi_pm);
92arcaneCreateMpiLegacyVariableSynchronizerFactory(MpiParallelMng* mpi_pm);
93#if defined(ARCANE_HAS_PACKAGE_NCCL)
95arcaneCreateNCCLVariableSynchronizerFactory(IParallelMng* mpi_pm);
96#endif
97
98/*---------------------------------------------------------------------------*/
99/*---------------------------------------------------------------------------*/
100
101MpiParallelMngBuildInfo::
102MpiParallelMngBuildInfo(MPI_Comm comm, MPI_Comm machine_comm)
103: is_parallel(false)
104, comm_rank(MessagePassing::A_NULL_RANK)
105, comm_nb_rank(0)
106, stat(nullptr)
107, trace_mng(nullptr)
108, timer_mng(nullptr)
109, thread_mng(nullptr)
110, mpi_comm(comm)
111, mpi_machine_comm(machine_comm)
112, is_mpi_comm_owned(true)
113, mpi_lock(nullptr)
114{
115 ::MPI_Comm_rank(comm, &comm_rank);
116 ::MPI_Comm_size(comm, &comm_nb_rank);
117
118 m_dispatchers_ref = createRef<MP::Dispatchers>();
119 MP::Mpi::MpiMessagePassingMng::BuildInfo bi(comm_rank, comm_nb_rank, m_dispatchers_ref.get(), mpi_comm);
120
121 m_message_passing_mng_ref = createRef<MP::Mpi::MpiMessagePassingMng>(bi);
122}
123
124/*---------------------------------------------------------------------------*/
125/*---------------------------------------------------------------------------*/
126
127/*---------------------------------------------------------------------------*/
128/*---------------------------------------------------------------------------*/
132class VariableSynchronizerMpiCommunicator
134{
135 public:
136
137 explicit VariableSynchronizerMpiCommunicator(MpiParallelMng* pm)
138 : m_mpi_parallel_mng(pm)
139 {}
140 ~VariableSynchronizerMpiCommunicator() override
141 {
142 _checkFreeCommunicator();
143 }
144 MPI_Comm communicator() const override
145 {
146 return m_topology_communicator;
147 }
148 void compute(VariableSynchronizer* var_syncer) override
149 {
150 Int32ConstArrayView comm_ranks = var_syncer->communicatingRanks();
151 const Int32 nb_message = comm_ranks.size();
152
153 MpiParallelMng* pm = m_mpi_parallel_mng;
154
155 MPI_Comm old_comm = pm->communicator();
156
157 UniqueArray<int> destinations(nb_message);
158 for (Integer i = 0; i < nb_message; ++i) {
159 destinations[i] = comm_ranks[i];
160 }
161
162 _checkFreeCommunicator();
163
164 int r = MPI_Dist_graph_create_adjacent(old_comm, nb_message, destinations.data(), MPI_UNWEIGHTED,
165 nb_message, destinations.data(), MPI_UNWEIGHTED,
166 MPI_INFO_NULL, 0, &m_topology_communicator);
167
168 if (r != MPI_SUCCESS)
169 ARCANE_FATAL("Error '{0}' in MPI_Dist_graph_create", r);
170
171 // Checks that the rank order for the MPI implementation is the same as the one we have in
172 // the VariableSynchronizer.
173 {
174 int indegree = 0;
175 int outdegree = 0;
176 int weighted = 0;
177 MPI_Dist_graph_neighbors_count(m_topology_communicator, &indegree, &outdegree, &weighted);
178
179 if (indegree != nb_message)
180 ARCANE_FATAL("Bad value '{0}' for 'indegree' (expected={1})", indegree, nb_message);
181 if (outdegree != nb_message)
182 ARCANE_FATAL("Bad value '{0}' for 'outdegree' (expected={1})", outdegree, nb_message);
183
184 UniqueArray<int> srcs(indegree);
185 UniqueArray<int> dsts(outdegree);
186
187 MPI_Dist_graph_neighbors(m_topology_communicator, indegree, srcs.data(), MPI_UNWEIGHTED, outdegree, dsts.data(), MPI_UNWEIGHTED);
188
189 for (int k = 0; k < outdegree; ++k) {
190 int x = dsts[k];
191 if (x != comm_ranks[k])
192 ARCANE_FATAL("Invalid destination rank order k={0} v={1} expected={2}", k, x, comm_ranks[k]);
193 }
194
195 for (int k = 0; k < indegree; ++k) {
196 int x = srcs[k];
197 if (x != comm_ranks[k])
198 ARCANE_FATAL("Invalid source rank order k={0} v={1} expected={2}", k, x, comm_ranks[k]);
199 }
200 }
201 }
202
203 private:
204
205 MpiParallelMng* m_mpi_parallel_mng = nullptr;
206 MPI_Comm m_topology_communicator = MPI_COMM_NULL;
207
208 private:
209
210 void _checkFreeCommunicator()
211 {
212 if (m_topology_communicator != MPI_COMM_NULL)
213 MPI_Comm_free(&m_topology_communicator);
214 m_topology_communicator = MPI_COMM_NULL;
215 }
216};
217
218/*---------------------------------------------------------------------------*/
219/*---------------------------------------------------------------------------*/
226class MpiVariableSynchronizer
227: public VariableSynchronizer
228{
229 public:
230
231 MpiVariableSynchronizer(IParallelMng* pm, const ItemGroup& group,
232 Ref<IDataSynchronizeImplementationFactory> implementation_factory,
234 : VariableSynchronizer(pm, group, implementation_factory)
235 , m_topology_info(topology_info)
236 {
237 }
238
239 public:
240
241 void compute() override
242 {
244 // If not null, calculate the topology
245 if (m_topology_info.get())
246 m_topology_info->compute(this);
247 }
248
249 private:
250
252};
253
254/*---------------------------------------------------------------------------*/
255/*---------------------------------------------------------------------------*/
256
257class MpiParallelMngUtilsFactory
259{
260 public:
261
262 MpiParallelMngUtilsFactory()
263 : m_synchronizer_version(2)
264 {
265 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION") == "1")
266 m_synchronizer_version = 1;
267 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION") == "2")
268 m_synchronizer_version = 2;
269 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION") == "3")
270 m_synchronizer_version = 3;
271 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION") == "4") {
272 m_synchronizer_version = 4;
273 String v = platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_BLOCK_SIZE");
274 if (!v.null()) {
275 Int32 block_size = 0;
276 if (!builtInGetValue(block_size, v))
277 m_synchronize_block_size = block_size;
278 m_synchronize_block_size = std::clamp(m_synchronize_block_size, 0, 1000000000);
279 }
280 v = platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_NB_SEQUENCE");
281 if (!v.null()) {
282 Int32 nb_sequence = 0;
283 if (!builtInGetValue(nb_sequence, v))
284 m_synchronize_nb_sequence = nb_sequence;
285 m_synchronize_nb_sequence = std::clamp(m_synchronize_nb_sequence, 1, 1024 * 1024);
286 }
287 }
288 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION") == "5")
289 m_synchronizer_version = 5;
290 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION") == "6")
291 m_synchronizer_version = 6;
292 }
293
294 public:
295
297 {
298 return _createSynchronizer(pm, family->allItems());
299 }
300
302 {
303 return _createSynchronizer(pm, group);
304 }
305
306 private:
307
308 Ref<IVariableSynchronizer> _createSynchronizer(IParallelMng* pm, const ItemGroup& group)
309 {
311 MpiParallelMng* mpi_pm = ARCANE_CHECK_POINTER(dynamic_cast<MpiParallelMng*>(pm));
312 ITraceMng* tm = pm->traceMng();
314 // Only displays information for the group of all cells to avoid displaying
315 // the same message multiple times.
316 bool do_print = (group.isAllItems() && group.itemKind() == IK_Cell);
317 if (m_synchronizer_version == 2) {
318 if (do_print)
319 tm->info() << "Using MpiSynchronizer V2";
320 generic_factory = arcaneCreateMpiVariableSynchronizerFactory(mpi_pm);
321 }
322 else if (m_synchronizer_version == 3) {
323 if (do_print)
324 tm->info() << "Using MpiSynchronizer V3";
325 generic_factory = arcaneCreateMpiDirectSendrecvVariableSynchronizerFactory(mpi_pm);
326 }
327 else if (m_synchronizer_version == 4) {
328 if (do_print)
329 tm->info() << "Using MpiSynchronizer V4 block_size=" << m_synchronize_block_size
330 << " nb_sequence=" << m_synchronize_nb_sequence;
331 generic_factory = arcaneCreateMpiBlockVariableSynchronizerFactory(mpi_pm, m_synchronize_block_size, m_synchronize_nb_sequence);
332 }
333 else if (m_synchronizer_version == 5) {
334 if (do_print)
335 tm->info() << "Using MpiSynchronizer V5";
336 topology_info = createRef<VariableSynchronizerMpiCommunicator>(mpi_pm);
337#if defined(ARCANE_HAS_MPI_NEIGHBOR)
338 generic_factory = arcaneCreateMpiNeighborVariableSynchronizerFactory(mpi_pm, topology_info);
339#else
340 throw NotSupportedException(A_FUNCINFO, "Synchronize implementation V5 is not supported with this version of MPI");
341#endif
342 }
343#if defined(ARCANE_HAS_PACKAGE_NCCL)
344 else if (m_synchronizer_version == 6) {
345 if (do_print)
346 tm->info() << "Using NCCLSynchronizer";
347 generic_factory = arcaneCreateNCCLVariableSynchronizerFactory(mpi_pm);
348 }
349#endif
350 else {
351 if (do_print)
352 tm->info() << "Using MpiSynchronizer V1";
353 generic_factory = arcaneCreateMpiLegacyVariableSynchronizerFactory(mpi_pm);
354 }
355 if (!generic_factory.get())
356 ARCANE_FATAL("No factory created");
357 return createRef<MpiVariableSynchronizer>(pm, group, generic_factory, topology_info);
358 }
359
360 private:
361
362 Integer m_synchronizer_version = 1;
363 Int32 m_synchronize_block_size = 32000;
364 Int32 m_synchronize_nb_sequence = 1;
365};
366
367/*---------------------------------------------------------------------------*/
368/*---------------------------------------------------------------------------*/
369
370/*---------------------------------------------------------------------------*/
371/*---------------------------------------------------------------------------*/
372
374: public ParallelMngInternal
375{
376 public:
377
378 explicit Impl(MpiParallelMng* pm)
379 : ParallelMngInternal(pm)
380 , m_parallel_mng(pm)
381 , m_alloc(makeRef(new MachineShMemWinMemoryAllocator(pm)))
382 {}
383
384 ~Impl() override = default;
385
386 public:
387
388 Int32 masterParallelIORank() const override { return m_parallel_mng->commRank(); }
389 Int32 nbSendersToMasterParallelIO() const override { return 1; }
390
392 {
393 m_parallel_mng->traceMng()->info() << "initializeWindowCreator() MPI";
394 m_parallel_mng->adapter()->initializeWindowCreator(m_parallel_mng->machineCommunicator());
395 }
396
398 {
399 if (m_shmem_available == 1) {
400 return true;
401 }
402
403 if (m_shmem_available == 0) {
404 Ref<IParallelTopology> topo = m_parallel_mng->_internalUtilsFactory()->createTopology(m_parallel_mng);
405 if (topo->machineRanks().size() == m_parallel_mng->adapter()->windowCreator()->machineRanks().size()) {
406 m_shmem_available = 1;
407 return true;
408 }
409 // Problem with MPI. May occur if MPICH is compiled in ch3:sock mode.
410 m_shmem_available = 2;
411 return false;
412 }
413
414 return false;
415 }
416
418 {
419 return makeRef(m_parallel_mng->adapter()->windowCreator()->createWindow(sizeof_segment, sizeof_type));
420 }
421
423 {
424 return makeRef(m_parallel_mng->adapter()->windowCreator()->createDynamicWindow(sizeof_segment, sizeof_type));
425 }
426
428 {
429 return MemoryAllocationOptions{ m_alloc.get() };
430 }
431
433 {
434 return m_parallel_mng->adapter()->windowCreator()->machineRanks();
435 }
436
437 void machineBarrier() override
438 {
439 m_parallel_mng->adapter()->windowCreator()->machineBarrier();
440 }
441
442 private:
443
444 MpiParallelMng* m_parallel_mng;
446
447 // 0 = Attribute not initialized
448 // 1 = Shared memory available
449 // 2 = Shared memory not available
450 Int8 m_shmem_available = 0;
451};
452
453/*---------------------------------------------------------------------------*/
454/*---------------------------------------------------------------------------*/
455
456/*---------------------------------------------------------------------------*/
457/*---------------------------------------------------------------------------*/
458
459MpiParallelMng::
460MpiParallelMng(const MpiParallelMngBuildInfo& bi)
461: ParallelMngDispatcher(ParallelMngDispatcherBuildInfo(bi.dispatchersRef(), bi.messagePassingMngRef()))
462, m_trace(bi.trace_mng)
463, m_thread_mng(bi.thread_mng)
464, m_world_parallel_mng(bi.world_parallel_mng)
465, m_timer_mng(bi.timer_mng)
466, m_replication(new ParallelReplication())
467, m_is_parallel(bi.is_parallel)
468, m_comm_rank(bi.commRank())
469, m_comm_size(bi.commSize())
470, m_stat(bi.stat)
471, m_communicator(bi.mpiComm())
472, m_machine_communicator(bi.mpiMachineComm())
473, m_is_communicator_owned(bi.is_mpi_comm_owned)
474, m_mpi_lock(bi.mpi_lock)
475, m_non_blocking_collective(nullptr)
476, m_utils_factory(createRef<MpiParallelMngUtilsFactory>())
477, m_parallel_mng_internal(new Impl(this))
478{
479 if (!m_world_parallel_mng) {
480 m_trace->debug() << "[MpiParallelMng] No m_world_parallel_mng found, reverting to ourselves!";
481 m_world_parallel_mng = this;
482 }
483}
484
485/*---------------------------------------------------------------------------*/
486/*---------------------------------------------------------------------------*/
487
488MpiParallelMng::
489~MpiParallelMng()
490{
491 delete m_parallel_mng_internal;
492 delete m_non_blocking_collective;
493 m_sequential_parallel_mng.reset();
494 if (m_is_communicator_owned) {
495 MpiLock::Section ls(m_mpi_lock);
496 MPI_Comm_free(&m_communicator);
497 MPI_Comm_free(&m_machine_communicator);
498 }
499 delete m_replication;
500 delete m_io_mng;
501 if (m_is_timer_owned)
502 delete m_timer_mng;
503 arcaneCallFunctionAndTerminateIfThrow([&]() { m_adapter->destroy(); });
504 delete m_datatype_list;
505}
506
507/*---------------------------------------------------------------------------*/
508/*---------------------------------------------------------------------------*/
509
510namespace
511{
512
513 /*---------------------------------------------------------------------------*/
514 /*---------------------------------------------------------------------------*/
515 // Class to create the different dispatchers
516 class DispatchCreator
517 {
518 public:
519
520 DispatchCreator(ITraceMng* tm, IMessagePassingMng* mpm, MpiAdapter* adapter, MpiDatatypeList* datatype_list)
521 : m_tm(tm)
522 , m_mpm(mpm)
523 , m_adapter(adapter)
524 , m_datatype_list(datatype_list)
525 {}
526
527 public:
528
529 template <typename DataType> MpiParallelDispatchT<DataType>*
530 create()
531 {
532 MpiDatatype* dt = m_datatype_list->datatype(DataType());
533 return new MpiParallelDispatchT<DataType>(m_tm, m_mpm, m_adapter, dt);
534 }
535
536 ITraceMng* m_tm;
537 IMessagePassingMng* m_mpm;
538 MpiAdapter* m_adapter;
539 MpiDatatypeList* m_datatype_list;
540 };
541
542 /*---------------------------------------------------------------------------*/
543 /*---------------------------------------------------------------------------*/
544
545 class ControlDispatcherDecorator
546 : public ParallelMngDispatcher::DefaultControlDispatcher
547 {
548 public:
549
550 ControlDispatcherDecorator(IParallelMng* pm, MpiAdapter* adapter)
551 : ParallelMngDispatcher::DefaultControlDispatcher(pm)
552 , m_adapter(adapter)
553 {}
554
555 IMessagePassingMng* commSplit(bool keep) override
556 {
557 return m_adapter->commSplit(keep);
558 }
559 MP::IProfiler* profiler() const override { return m_adapter->profiler(); }
560 void setProfiler(MP::IProfiler* p) override { m_adapter->setProfiler(p); }
561
562 private:
563
564 MpiAdapter* m_adapter;
565 };
566} // namespace
567
568/*---------------------------------------------------------------------------*/
569/*---------------------------------------------------------------------------*/
570
572build()
573{
574 ITraceMng* tm = traceMng();
575 if (!m_timer_mng) {
576 m_timer_mng = new MpiTimerMng(tm);
577 m_is_timer_owned = true;
578 }
579
580 // Created the associated sequential manager.
581 {
583 bi.setTraceMng(traceMng());
584 bi.setCommunicator(communicator());
585 bi.setThreadMng(threadMng());
586 m_sequential_parallel_mng = arcaneCreateSequentialParallelMngRef(bi);
587 }
588
589 // Indicates that reduces must be performed in processor order
590 // in order to guarantee deterministic execution
591 bool is_ordered_reduce = false;
592 if (platform::getEnvironmentVariable("ARCANE_ORDERED_REDUCE") == "TRUE")
593 is_ordered_reduce = true;
594 m_datatype_list = new MpiDatatypeList(is_ordered_reduce);
595
596 ARCANE_CHECK_POINTER(m_stat);
597
598 MpiAdapter* adapter = new MpiAdapter(m_trace, m_stat->toArccoreStat(), m_communicator, m_mpi_lock);
599 m_adapter = adapter;
600 auto mpm = _messagePassingMng();
601
602 // NOTE: this instance will be destroyed by the ParallelMngDispatcher
603 auto* control_dispatcher = new ControlDispatcherDecorator(this, m_adapter);
604 _setControlDispatcher(control_dispatcher);
605
606 // NOTE: this instance will be destroyed by the ParallelMngDispatcher
607 auto* serialize_dispatcher = new MpiSerializeDispatcher(m_adapter, mpm);
608 m_mpi_serialize_dispatcher = serialize_dispatcher;
609 _setSerializeDispatcher(serialize_dispatcher);
610
611 DispatchCreator creator(m_trace, mpm, m_adapter, m_datatype_list);
612 this->createDispatchers(creator);
613
614 m_io_mng = arcaneCreateIOMng(this);
615
616 m_non_blocking_collective = new MpiParallelNonBlockingCollective(tm, this, adapter);
617 m_non_blocking_collective->build();
618 if (m_mpi_lock)
619 m_trace->info() << "Using mpi with locks.";
620
621 m_parallel_mng_internal->initializeWindowCreator();
622}
623
624/*---------------------------------------------------------------------------*/
625/*---------------------------------------------------------------------------*/
626
627/*----------------------------------------------------------------------------*/
628/*---------------------------------------------------------------------------*/
629
632{
633 Trace::Setter mci(m_trace, "Mpi");
634 if (m_is_initialized) {
635 m_trace->warning() << "MpiParallelMng already initialized";
636 return;
637 }
638
639 // Initialization of MpiParallelMng
640 m_trace->info() << "Initialisation de MpiParallelMng";
641 m_sequential_parallel_mng->initialize();
642
643 m_adapter->setTimeMetricCollector(timeMetricCollector());
644
645 m_is_initialized = true;
646}
647
648/*---------------------------------------------------------------------------*/
649/*---------------------------------------------------------------------------*/
650
651void MpiParallelMng::
652sendSerializer(ISerializer* s, Int32 rank)
653{
654 Trace::Setter mci(m_trace, "Mpi");
655 Timer::Phase tphase(timeStats(), TP_Communication);
657 m_mpi_serialize_dispatcher->legacySendSerializer(s, { MessageRank(rank), mpi_tag, Blocking });
658}
659
660/*---------------------------------------------------------------------------*/
661/*---------------------------------------------------------------------------*/
662
665{
666 return m_utils_factory->createSendSerializeMessage(this, rank)._release();
667}
668
669/*---------------------------------------------------------------------------*/
670/*---------------------------------------------------------------------------*/
671
672Request MpiParallelMng::
673sendSerializer(ISerializer* s, Int32 rank, [[maybe_unused]] ByteArray& bytes)
674{
675 Trace::Setter mci(m_trace, "Mpi");
676 Timer::Phase tphase(timeStats(), TP_Communication);
678 return m_mpi_serialize_dispatcher->legacySendSerializer(s, { MessageRank(rank), mpi_tag, NonBlocking });
679}
680
681/*---------------------------------------------------------------------------*/
682/*---------------------------------------------------------------------------*/
683
684void MpiParallelMng::
685broadcastSerializer(ISerializer* values, Int32 rank)
686{
687 Timer::Phase tphase(timeStats(), TP_Communication);
688 m_mpi_serialize_dispatcher->broadcastSerializer(values, MessageRank(rank));
689}
690
691/*---------------------------------------------------------------------------*/
692/*---------------------------------------------------------------------------*/
693
694void MpiParallelMng::
695recvSerializer(ISerializer* values, Int32 rank)
696{
697 Trace::Setter mci(m_trace, "Mpi");
698 Timer::Phase tphase(timeStats(), TP_Communication);
700 m_mpi_serialize_dispatcher->legacyReceiveSerializer(values, MessageRank(rank), mpi_tag);
701}
702
703/*---------------------------------------------------------------------------*/
704/*---------------------------------------------------------------------------*/
705
708{
709 return m_utils_factory->createReceiveSerializeMessage(this, rank)._release();
710}
711
712/*---------------------------------------------------------------------------*/
713/*---------------------------------------------------------------------------*/
714
716probe(const PointToPointMessageInfo& message) -> MessageId
717{
718 return m_adapter->probeMessage(message);
719}
720
721/*---------------------------------------------------------------------------*/
722/*---------------------------------------------------------------------------*/
723
726{
727 return m_adapter->legacyProbeMessage(message);
728}
729
730/*---------------------------------------------------------------------------*/
731/*---------------------------------------------------------------------------*/
732
733Request MpiParallelMng::
734sendSerializer(const ISerializer* s, const PointToPointMessageInfo& message)
735{
736 return m_mpi_serialize_dispatcher->sendSerializer(s, message);
737}
738
739/*---------------------------------------------------------------------------*/
740/*---------------------------------------------------------------------------*/
741
742Request MpiParallelMng::
743receiveSerializer(ISerializer* s, const PointToPointMessageInfo& message)
744{
745 return m_mpi_serialize_dispatcher->receiveSerializer(s, message);
746}
747
748/*---------------------------------------------------------------------------*/
749/*---------------------------------------------------------------------------*/
750
753{
754 for (Integer i = 0, is = requests.size(); i < is; ++i)
755 m_adapter->freeRequest(requests[i]);
756}
757
758/*---------------------------------------------------------------------------*/
759/*---------------------------------------------------------------------------*/
760
761void MpiParallelMng::
762_checkFinishedSubRequests()
763{
764 m_mpi_serialize_dispatcher->checkFinishedSubRequests();
765}
766
767/*---------------------------------------------------------------------------*/
768/*---------------------------------------------------------------------------*/
769
770Ref<IParallelMng> MpiParallelMng::
771sequentialParallelMngRef()
772{
773 return m_sequential_parallel_mng;
774}
775
778{
779 return m_sequential_parallel_mng.get();
780}
781
782/*---------------------------------------------------------------------------*/
783/*---------------------------------------------------------------------------*/
784
787{
788 if (m_stat)
789 m_stat->print(m_trace);
790}
791
792/*---------------------------------------------------------------------------*/
793/*---------------------------------------------------------------------------*/
794
796barrier()
797{
798 traceMng()->flush();
799 m_adapter->barrier();
800}
801
802/*---------------------------------------------------------------------------*/
803/*---------------------------------------------------------------------------*/
804
807{
808 m_adapter->waitAllRequests(requests);
809 _checkFinishedSubRequests();
810}
811
812/*---------------------------------------------------------------------------*/
813/*---------------------------------------------------------------------------*/
814
817{
818 return _waitSomeRequests(requests, false);
819}
820
821/*---------------------------------------------------------------------------*/
822/*---------------------------------------------------------------------------*/
823
826{
827 return _waitSomeRequests(requests, true);
828}
829
830/*---------------------------------------------------------------------------*/
831/*---------------------------------------------------------------------------*/
832
833UniqueArray<Integer> MpiParallelMng::
834_waitSomeRequests(ArrayView<Request> requests, bool is_non_blocking)
835{
836 UniqueArray<Integer> results;
837 UniqueArray<bool> done_indexes(requests.size());
838
839 m_adapter->waitSomeRequests(requests, done_indexes, is_non_blocking);
840 for (int i = 0; i < requests.size(); i++) {
841 if (done_indexes[i])
842 results.add(i);
843 }
844 return results;
845}
846
847/*---------------------------------------------------------------------------*/
848/*---------------------------------------------------------------------------*/
849
850ISerializeMessageList* MpiParallelMng::
851_createSerializeMessageList()
852{
853 return new MP::internal::SerializeMessageList(messagePassingMng());
854}
855
856/*---------------------------------------------------------------------------*/
857/*---------------------------------------------------------------------------*/
858
861{
862 return m_utils_factory->createGetVariablesValuesOperation(this)._release();
863}
864
865/*---------------------------------------------------------------------------*/
866/*---------------------------------------------------------------------------*/
867
870{
871 return m_utils_factory->createTransferValuesOperation(this)._release();
872}
873
874/*---------------------------------------------------------------------------*/
875/*---------------------------------------------------------------------------*/
876
879{
880 return m_utils_factory->createExchanger(this)._release();
881}
882
883/*---------------------------------------------------------------------------*/
884/*---------------------------------------------------------------------------*/
885
888{
889 return m_utils_factory->createSynchronizer(this, family)._release();
890}
891
892/*---------------------------------------------------------------------------*/
893/*---------------------------------------------------------------------------*/
894
896createSynchronizer(const ItemGroup& group)
897{
898 return m_utils_factory->createSynchronizer(this, group)._release();
899}
900
901/*---------------------------------------------------------------------------*/
902/*---------------------------------------------------------------------------*/
903
906{
907 return m_utils_factory->createTopology(this)._release();
908}
909
910/*---------------------------------------------------------------------------*/
911/*---------------------------------------------------------------------------*/
912
914replication() const
915{
916 return m_replication;
917}
918
919/*---------------------------------------------------------------------------*/
920/*---------------------------------------------------------------------------*/
921
924{
925 delete m_replication;
926 m_replication = v;
927}
928
929/*---------------------------------------------------------------------------*/
930/*---------------------------------------------------------------------------*/
931
932IParallelMng* MpiParallelMng::
933_createSubParallelMng(MPI_Comm sub_communicator)
934{
935 // If null, this rank is not part of the sub-communicator
936 if (sub_communicator == MPI_COMM_NULL)
937 return nullptr;
938
939 int sub_rank = -1;
940 MPI_Comm_rank(sub_communicator, &sub_rank);
941
942 MPI_Comm sub_machine_communicator = MPI_COMM_NULL;
943 MPI_Comm_split_type(sub_communicator, MPI_COMM_TYPE_SHARED, sub_rank, MPI_INFO_NULL, &sub_machine_communicator);
944
945 MpiParallelMngBuildInfo bi(sub_communicator, sub_machine_communicator);
946 bi.is_parallel = isParallel();
947 bi.stat = m_stat;
948 bi.timer_mng = m_timer_mng;
949 bi.thread_mng = m_thread_mng;
950 bi.trace_mng = m_trace;
951 bi.world_parallel_mng = m_world_parallel_mng;
952 bi.mpi_lock = m_mpi_lock;
953
954 IParallelMng* sub_pm = new MpiParallelMng(bi);
955 sub_pm->build();
956 return sub_pm;
957}
958
959/*---------------------------------------------------------------------------*/
960/*---------------------------------------------------------------------------*/
961
962Ref<IParallelMng> MpiParallelMng::
963_createSubParallelMngRef(Int32 color, Int32 key)
964{
965 if (color < 0)
966 color = MPI_UNDEFINED;
967 MPI_Comm sub_communicator = MPI_COMM_NULL;
968 MPI_Comm_split(m_communicator, color, key, &sub_communicator);
969 IParallelMng* sub_pm = _createSubParallelMng(sub_communicator);
970 return makeRef(sub_pm);
971}
972
973/*---------------------------------------------------------------------------*/
974/*---------------------------------------------------------------------------*/
975
976IParallelMng* MpiParallelMng::
977_createSubParallelMng(Int32ConstArrayView kept_ranks)
978{
979 MPI_Group mpi_group = MPI_GROUP_NULL;
980 MPI_Comm_group(m_communicator, &mpi_group);
981 Integer nb_sub_rank = kept_ranks.size();
982 UniqueArray<int> mpi_kept_ranks(nb_sub_rank);
983 for (Integer i = 0; i < nb_sub_rank; ++i)
984 mpi_kept_ranks[i] = (int)kept_ranks[i];
985
986 MPI_Group final_group = MPI_GROUP_NULL;
987 MPI_Group_incl(mpi_group, nb_sub_rank, mpi_kept_ranks.data(), &final_group);
988 MPI_Comm sub_communicator = MPI_COMM_NULL;
989
990 MPI_Comm_create(m_communicator, final_group, &sub_communicator);
991 MPI_Group_free(&final_group);
992 return _createSubParallelMng(sub_communicator);
993}
994
995/*---------------------------------------------------------------------------*/
996/*---------------------------------------------------------------------------*/
997
1006: public MpiRequestList
1007{
1008 using Base = MpiRequestList;
1009
1010 public:
1011
1012 explicit RequestList(MpiParallelMng* pm)
1013 : Base(pm->m_adapter)
1014 , m_parallel_mng(pm)
1015 {}
1016
1017 public:
1018
1019 void _wait(Parallel::eWaitType wait_type) override
1020 {
1021 Base::_wait(wait_type);
1022 m_parallel_mng->_checkFinishedSubRequests();
1023 };
1024
1025 private:
1026
1027 MpiParallelMng* m_parallel_mng;
1028};
1029
1030/*---------------------------------------------------------------------------*/
1031/*---------------------------------------------------------------------------*/
1032
1038
1039/*---------------------------------------------------------------------------*/
1040/*---------------------------------------------------------------------------*/
1041
1044{
1045 return m_utils_factory;
1046}
1047
1048/*---------------------------------------------------------------------------*/
1049/*---------------------------------------------------------------------------*/
1050
1051bool MpiParallelMng::
1052_isAcceleratorAware() const
1053{
1055}
1056
1057/*---------------------------------------------------------------------------*/
1058/*---------------------------------------------------------------------------*/
1059
1060} // End namespace Arcane
1061
1062/*---------------------------------------------------------------------------*/
1063/*---------------------------------------------------------------------------*/
#define ARCANE_CHECK_POINTER(ptr)
Macro returning the pointer ptr if it is not null or throwing an exception if it is null.
#define ARCANE_FATAL(...)
Macro throwing a FatalErrorException.
Brief list of message exchange functions.
Modifiable view of an array of type T.
constexpr Integer size() const noexcept
Returns the size of the array.
void add(ConstReferenceType val)
Adds element val to the end of the array.
const T * data() const
Access to the root of the array without any protection.
Constant view of an array of type T.
constexpr Integer size() const noexcept
Number of elements in the array.
Operations to access variable values from another subdomain.
Interface of the input/output manager.
Definition IIOMng.h:37
Interface of an entity family.
Definition IItemFamily.h:83
virtual ItemGroup allItems() const =0
Group of all entities.
Information exchange between processors.
Interface of the parallelism manager for a subdomain.
virtual ITraceMng * traceMng() const =0
Trace manager.
virtual void build()=0
Constructs the instance.
Brief information on parallel subdomain replication.
Information on the computing core allocation topology.
virtual void flush()=0
Flushes all streams.
virtual TraceMessage info()=0
Stream for an information message.
Sends values across different processors.
Interface of a specific MPI communicator for synchronizations.
Interface of a variable synchronization service.
Mesh entity group.
Definition ItemGroup.h:51
eItemKind itemKind() const
Group kind. This is the kind of its elements.
Definition ItemGroup.h:114
bool isAllItems() const
Indicates if the group is that of all entities.
Definition ItemGroup.cc:607
Information about the source of a message.
void _wait(eWaitType wait_type) override
Performs the wait or test.
Request receiveSerializer(ISerializer *s, const PointToPointMessageInfo &message) override
Receiving message.
Request sendSerializer(const ISerializer *s, const PointToPointMessageInfo &message) override
Sending message.
Information for sending/receiving a point-to-point message.
Serializing message using a BasicSerializer.
static MessageTag defaultTag()
Default tag for serialization messages.
Manages the MPI_Datatypes associated with Arcane types.
Ref< IVariableSynchronizer > createSynchronizer(IParallelMng *pm, IItemFamily *family) override
Returns an interface to synchronize variables on the group of the family family.
Ref< IVariableSynchronizer > createSynchronizer(IParallelMng *pm, const ItemGroup &group) override
Returns an interface to synchronize variables on the group group.
void initializeWindowCreator() override
Method allowing the initialization of the windowCreator specific to the implementation.
Ref< IContigMachineShMemWinBaseInternal > createContigMachineShMemWinBase(Int64 sizeof_segment, Int32 sizeof_type) override
Method allowing the creation of a memory window on the node.
Ref< IMachineShMemWinBaseInternal > createMachineShMemWinBase(Int64 sizeof_segment, Int32 sizeof_type) override
Method allowing the creation of a dynamic memory window on the node.
bool isMachineShMemWinAvailable() override
Method allowing to know if shared memory mode is supported.
ConstArrayView< Int32 > machineRanks() override
Method allowing retrieval of the ranks of the sub-domains of the computing node.
void machineBarrier() override
Method allowing a barrier for the sub-domains of the computing node.
Int32 masterParallelIORank() const override
Int32 nbSendersToMasterParallelIO() const override
MemoryAllocationOptions machineShMemWinMemoryAllocator() override
Method allowing retrieval of a shared memory allocator.
Specialization of MpiRequestList for MpiParallelMng.
void _wait(Parallel::eWaitType wait_type) override
Performs the wait or test.
Parallelism manager using MPI.
IParallelMng * worldParallelMng() const override
Parallelism manager over all allocated resources.
MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message) override
Probes if messages are available.
void barrier() override
Performs a barrier.
UniqueArray< Integer > waitSomeRequests(ArrayView< Request > requests) override
Blocks while waiting for one of the rvalues requests to complete.
void waitAllRequests(ArrayView< Request > requests) override
Blocks while waiting for the rvalues requests to complete.
MessageId probe(const PointToPointMessageInfo &message) override
Probes if messages are available.
void build() override
Constructs the instance.
void printStats() override
Prints statistics related to this parallelism manager.
bool m_is_initialized
true if already initialized
IParallelMng * sequentialParallelMng() override
Returns a sequential parallelism manager.
IThreadMng * threadMng() const override
Thread manager.
ITimerMng * timerMng() const override
Timer manager.
void initialize() override
Initializes the parallelism manager.
IVariableSynchronizer * createSynchronizer(IItemFamily *family) override
Returns an interface for synchronizing variables on the group of the family.
ISerializeMessage * createSendSerializer(Int32 rank) override
Creates a non-blocking message to send serialized data to rank rank.
bool isParallel() const override
Returns true if the execution is parallel.
Ref< IParallelMngUtilsFactory > _internalUtilsFactory() const override
Factory for utility functions.
ITraceMng * traceMng() const override
Trace manager.
IParallelTopology * createTopology() override
Creates an instance containing information about the rank topology of this manager.
Communicator communicator() const override
MPI communicator associated with this manager.
IParallelExchanger * createExchanger() override
Returns an interface for transferring messages between processors.
IParallelReplication * replication() const override
Replication information.
void setReplication(IParallelReplication *v) override
Sets the Replication Information.
Ref< Parallel::IRequestList > createRequestListRef() override
Creates a request list for this manager.
ITransferValuesParallelOperation * createTransferValuesOperation() override
Returns an operation to transfer values between subdomains.
UniqueArray< Integer > testSomeRequests(ArrayView< Request > requests) override
Tests if one of the rvalues requests is complete.
IGetVariablesValuesParallelOperation * createGetVariablesValuesOperation() override
Returns an operation to retrieve the values of a variable on the entities of another subdomain.
ISerializeMessage * createReceiveSerializer(Int32 rank) override
Creates a non-blocking message to receive serialized data from rank rank.
void freeRequests(ArrayView< Parallel::Request > requests) override
Frees the requests.
Timer manager using the MPI library.
Definition MpiTimerMng.h:41
void compute() override
Recalculates the synchronization information.
Redirects the message management of sub-domains according to the argument type.
IMessagePassingMng * messagePassingMng() const override
Associated Arccore message passing manager.
ITimeMetricCollector * timeMetricCollector() const override
Arccore temporal statistics collector (can be null).
ITimeStats * timeStats() const override
Associated statistics manager (can be null).
Base class of a factory for IParallelMng utility functions.
Brief information on parallel subdomain replication.
InstanceType * get() const
Associated instance or nullptr if none.
Reference to an instance.
bool null() const
Returns true if the string is null.
Definition String.cc:306
Positions the phase of the currently executing action.
Definition Timer.h:142
1D data vector with value semantics (STL style).
MPI_Comm communicator() const override
Retrieves the specific communicator from the topology.
void compute(VariableSynchronizer *var_syncer) override
Calculates the specific communicator.
Interface of a variable synchronization service.
void compute() override
Creation of the list of synchronization elements.
Int32ConstArrayView communicatingRanks() override
Ranks of subdomains with which communication occurs.
Declarations of types and methods used by message exchange mechanisms.
String getEnvironmentVariable(const String &name)
Environment variable named name.
-- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature --
std::int8_t Int8
Signed integer type of 8 bits.
Ref< TrueType > createRef(Args &&... args)
Creates an instance of type TrueType with arguments Args and returns a reference to it.
ARCANE_MPI_EXPORT bool arcaneIsAcceleratorAwareMPI()
Indicates if the current MPI runtime supports accelerators.
Definition ArcaneMpi.cc:85
std::int64_t Int64
Signed integer type of 64 bits.
Int32 Integer
Type representing an integer.
Array< Byte > ByteArray
Dynamic one-dimensional array of characters.
Definition UtilsTypes.h:121
ConstArrayView< Int32 > Int32ConstArrayView
C equivalent of a 1D array of 32-bit integers.
Definition UtilsTypes.h:482
@ IK_Cell
Cell mesh entity.
void arcaneCallFunctionAndTerminateIfThrow(std::function< void()> function)
Calls the function function and calls std::terminate() if an exception occurs.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Creates a reference on a pointer.
std::int32_t Int32
Signed integer type of 32 bits.
Info to construct an MpiParallelMng.
Information to construct a SequentialParallelMng.