Arcane  v4.1.7.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
MpiParallelMng.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2026 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* MpiParallelMng.cc (C) 2000-2026 */
9/* */
10/* Gestionnaire de parallélisme utilisant MPI. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/utils/Collection.h"
15#include "arcane/utils/Enumerator.h"
16#include "arcane/utils/ScopedPtr.h"
17#include "arcane/utils/PlatformUtils.h"
18#include "arcane/utils/TimeoutException.h"
19#include "arcane/utils/NotImplementedException.h"
20#include "arcane/utils/ArgumentException.h"
21#include "arcane/utils/ITraceMng.h"
22#include "arcane/utils/ValueConvert.h"
23#include "arcane/utils/Exception.h"
24#include "arcane/utils/HPReal.h"
25
26#include "arcane/core/IIOMng.h"
27#include "arcane/core/Timer.h"
28#include "arcane/core/IItemFamily.h"
29#include "arcane/core/IParallelTopology.h"
30#include "arcane/core/parallel/IStat.h"
31#include "arcane/core/internal/SerializeMessage.h"
32#include "arcane/core/internal/ParallelMngInternal.h"
33#include "arcane/core/internal/MachineShMemWinMemoryAllocator.h"
34
35#include "arcane/parallel/mpi/MpiParallelMng.h"
36#include "arcane/parallel/mpi/MpiParallelDispatch.h"
37#include "arcane/parallel/mpi/MpiTimerMng.h"
38#include "arcane/parallel/mpi/MpiSerializeMessage.h"
39#include "arcane/parallel/mpi/MpiParallelNonBlockingCollective.h"
40#include "arcane/parallel/mpi/MpiDatatype.h"
41#include "arcane/parallel/mpi/IVariableSynchronizerMpiCommunicator.h"
42
43#include "arcane/impl/ParallelReplication.h"
44#include "arcane/impl/SequentialParallelMng.h"
45#include "arcane/impl/internal/ParallelMngUtilsFactoryBase.h"
46#include "arcane/impl/internal/VariableSynchronizer.h"
47
48#include "arccore/message_passing_mpi/MpiMessagePassingMng.h"
49#include "arccore/message_passing_mpi/internal/MpiSerializeDispatcher.h"
50#include "arccore/message_passing_mpi/internal/MpiRequestList.h"
51#include "arccore/message_passing_mpi/internal/MpiAdapter.h"
52#include "arccore/message_passing_mpi/internal/MpiLock.h"
53#include "arccore/message_passing_mpi/internal/MpiMachineShMemWinBaseInternalCreator.h"
54#include "arccore/message_passing_mpi/internal/MpiContigMachineShMemWinBaseInternal.h"
55#include "arccore/message_passing_mpi/internal/MpiMachineShMemWinBaseInternal.h"
56#include "arccore/message_passing/Dispatchers.h"
58#include "arccore/message_passing/internal/SerializeMessageList.h"
59
60#include "arcane_packages.h"
61
62//#define ARCANE_TRACE_MPI
63
64/*---------------------------------------------------------------------------*/
65/*---------------------------------------------------------------------------*/
66
67namespace Arcane
68{
69using namespace Arcane::MessagePassing;
70using namespace Arcane::MessagePassing::Mpi;
72
73/*---------------------------------------------------------------------------*/
74/*---------------------------------------------------------------------------*/
75
76extern "C++" IIOMng*
77arcaneCreateIOMng(IParallelMng* psm);
78
79#if defined(ARCANE_HAS_MPI_NEIGHBOR)
80// Défini dans MpiNeighborVariableSynchronizeDispatcher
82arcaneCreateMpiNeighborVariableSynchronizerFactory(MpiParallelMng* mpi_pm,
83 Ref<IVariableSynchronizerMpiCommunicator> synchronizer_communicator);
84#endif
86arcaneCreateMpiBlockVariableSynchronizerFactory(MpiParallelMng* mpi_pm, Int32 block_size, Int32 nb_sequence);
88arcaneCreateMpiVariableSynchronizerFactory(MpiParallelMng* mpi_pm);
90arcaneCreateMpiDirectSendrecvVariableSynchronizerFactory(MpiParallelMng* mpi_pm);
92arcaneCreateMpiLegacyVariableSynchronizerFactory(MpiParallelMng* mpi_pm);
93#if defined(ARCANE_HAS_PACKAGE_NCCL)
95arcaneCreateNCCLVariableSynchronizerFactory(IParallelMng* mpi_pm);
96#endif
97
98/*---------------------------------------------------------------------------*/
99/*---------------------------------------------------------------------------*/
100
101MpiParallelMngBuildInfo::
102MpiParallelMngBuildInfo(MPI_Comm comm, MPI_Comm machine_comm)
103: is_parallel(false)
104, comm_rank(MessagePassing::A_NULL_RANK)
105, comm_nb_rank(0)
106, stat(nullptr)
107, trace_mng(nullptr)
108, timer_mng(nullptr)
109, thread_mng(nullptr)
110, mpi_comm(comm)
111, mpi_machine_comm(machine_comm)
112, is_mpi_comm_owned(true)
113, mpi_lock(nullptr)
114{
115 ::MPI_Comm_rank(comm,&comm_rank);
116 ::MPI_Comm_size(comm,&comm_nb_rank);
117
118 m_dispatchers_ref = createRef<MP::Dispatchers>();
119 MP::Mpi::MpiMessagePassingMng::BuildInfo bi(comm_rank,comm_nb_rank,m_dispatchers_ref.get(),mpi_comm);
120
121 m_message_passing_mng_ref = createRef<MP::Mpi::MpiMessagePassingMng>(bi);
122}
123
124/*---------------------------------------------------------------------------*/
125/*---------------------------------------------------------------------------*/
126
127/*---------------------------------------------------------------------------*/
128/*---------------------------------------------------------------------------*/
132class VariableSynchronizerMpiCommunicator
134{
135 public:
136 explicit VariableSynchronizerMpiCommunicator(MpiParallelMng* pm)
137 : m_mpi_parallel_mng(pm){}
138 ~VariableSynchronizerMpiCommunicator() override
139 {
140 _checkFreeCommunicator();
141 }
142 MPI_Comm communicator() const override
143 {
144 return m_topology_communicator;
145 }
146 void compute(VariableSynchronizer* var_syncer) override
147 {
148 Int32ConstArrayView comm_ranks = var_syncer->communicatingRanks();
149 const Int32 nb_message = comm_ranks.size();
150
151 MpiParallelMng* pm = m_mpi_parallel_mng;
152
153 MPI_Comm old_comm = pm->communicator();
154
155 UniqueArray<int> destinations(nb_message);
156 for( Integer i=0; i<nb_message; ++i ){
157 destinations[i] = comm_ranks[i];
158 }
159
160 _checkFreeCommunicator();
161
162 int r = MPI_Dist_graph_create_adjacent(old_comm, nb_message, destinations.data(), MPI_UNWEIGHTED,
163 nb_message, destinations.data(), MPI_UNWEIGHTED,
164 MPI_INFO_NULL, 0, &m_topology_communicator);
165
166 if (r!=MPI_SUCCESS)
167 ARCANE_FATAL("Error '{0}' in MPI_Dist_graph_create",r);
168
169 // Vérifie que l'ordre des rangs pour l'implémentation MPI est le même que celui qu'on a dans
170 // le VariableSynchronizer.
171 {
172 int indegree = 0;
173 int outdegree = 0;
174 int weighted = 0;
175 MPI_Dist_graph_neighbors_count(m_topology_communicator,&indegree,&outdegree,&weighted);
176
177 if (indegree!=nb_message)
178 ARCANE_FATAL("Bad value '{0}' for 'indegree' (expected={1})",indegree,nb_message);
179 if (outdegree!=nb_message)
180 ARCANE_FATAL("Bad value '{0}' for 'outdegree' (expected={1})",outdegree,nb_message);
181
182 UniqueArray<int> srcs(indegree);
183 UniqueArray<int> dsts(outdegree);
184
185 MPI_Dist_graph_neighbors(m_topology_communicator,indegree,srcs.data(),MPI_UNWEIGHTED,outdegree,dsts.data(),MPI_UNWEIGHTED);
186
187 for(int k=0; k<outdegree; ++k){
188 int x = dsts[k];
189 if (x!=comm_ranks[k])
190 ARCANE_FATAL("Invalid destination rank order k={0} v={1} expected={2}",k,x,comm_ranks[k]);
191 }
192
193 for(int k=0; k<indegree; ++k ){
194 int x = srcs[k];
195 if (x!=comm_ranks[k])
196 ARCANE_FATAL("Invalid source rank order k={0} v={1} expected={2}",k,x,comm_ranks[k]);
197 }
198 }
199 }
200
201 private:
202
203 MpiParallelMng* m_mpi_parallel_mng = nullptr;
204 MPI_Comm m_topology_communicator = MPI_COMM_NULL;
205
206 private:
207
208 void _checkFreeCommunicator()
209 {
210 if (m_topology_communicator!=MPI_COMM_NULL)
211 MPI_Comm_free(&m_topology_communicator);
212 m_topology_communicator = MPI_COMM_NULL;
213 }
214};
215
216/*---------------------------------------------------------------------------*/
217/*---------------------------------------------------------------------------*/
224class MpiVariableSynchronizer
225: public VariableSynchronizer
226{
227 public:
228 MpiVariableSynchronizer(IParallelMng* pm,const ItemGroup& group,
229 Ref<IDataSynchronizeImplementationFactory> implementation_factory,
231 : VariableSynchronizer(pm,group,implementation_factory)
232 , m_topology_info(topology_info)
233 {
234 }
235 public:
236 void compute() override
237 {
239 // Si non nul, calcule la topologie
240 if (m_topology_info.get())
241 m_topology_info->compute(this);
242 }
243 private:
245};
246
247/*---------------------------------------------------------------------------*/
248/*---------------------------------------------------------------------------*/
249
250class MpiParallelMngUtilsFactory
252{
253 public:
254 MpiParallelMngUtilsFactory()
255 : m_synchronizer_version(2)
256 {
257 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="1")
258 m_synchronizer_version = 1;
259 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="2")
260 m_synchronizer_version = 2;
261 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="3")
262 m_synchronizer_version = 3;
263 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="4"){
264 m_synchronizer_version = 4;
265 String v = platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_BLOCK_SIZE");
266 if (!v.null()){
267 Int32 block_size = 0;
268 if (!builtInGetValue(block_size,v))
269 m_synchronize_block_size = block_size;
270 m_synchronize_block_size = std::clamp(m_synchronize_block_size,0,1000000000);
271 }
272 v = platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_NB_SEQUENCE");
273 if (!v.null()){
274 Int32 nb_sequence = 0;
275 if (!builtInGetValue(nb_sequence,v))
276 m_synchronize_nb_sequence = nb_sequence;
277 m_synchronize_nb_sequence = std::clamp(m_synchronize_nb_sequence,1,1024*1024);
278 }
279 }
280 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="5")
281 m_synchronizer_version = 5;
282 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="6")
283 m_synchronizer_version = 6;
284 }
285 public:
286
288 {
289 return _createSynchronizer(pm,family->allItems());
290 }
291
293 {
294 return _createSynchronizer(pm,group);
295 }
296
297 private:
298
299 Ref<IVariableSynchronizer> _createSynchronizer(IParallelMng* pm,const ItemGroup& group)
300 {
302 MpiParallelMng* mpi_pm = ARCANE_CHECK_POINTER(dynamic_cast<MpiParallelMng*>(pm));
303 ITraceMng* tm = pm->traceMng();
305 // N'affiche les informations que pour le groupe de toutes les mailles pour éviter d'afficher
306 // plusieurs fois le même message.
307 bool do_print = (group.isAllItems() && group.itemKind()==IK_Cell);
308 if (m_synchronizer_version == 2){
309 if (do_print)
310 tm->info() << "Using MpiSynchronizer V2";
311 generic_factory = arcaneCreateMpiVariableSynchronizerFactory(mpi_pm);
312 }
313 else if (m_synchronizer_version == 3 ){
314 if (do_print)
315 tm->info() << "Using MpiSynchronizer V3";
316 generic_factory = arcaneCreateMpiDirectSendrecvVariableSynchronizerFactory(mpi_pm);
317 }
318 else if (m_synchronizer_version == 4){
319 if (do_print)
320 tm->info() << "Using MpiSynchronizer V4 block_size=" << m_synchronize_block_size
321 << " nb_sequence=" << m_synchronize_nb_sequence;
322 generic_factory = arcaneCreateMpiBlockVariableSynchronizerFactory(mpi_pm,m_synchronize_block_size,m_synchronize_nb_sequence);
323 }
324 else if (m_synchronizer_version == 5){
325 if (do_print)
326 tm->info() << "Using MpiSynchronizer V5";
327 topology_info = createRef<VariableSynchronizerMpiCommunicator>(mpi_pm);
328#if defined(ARCANE_HAS_MPI_NEIGHBOR)
329 generic_factory = arcaneCreateMpiNeighborVariableSynchronizerFactory(mpi_pm,topology_info);
330#else
331 throw NotSupportedException(A_FUNCINFO,"Synchronize implementation V5 is not supported with this version of MPI");
332#endif
333 }
334#if defined(ARCANE_HAS_PACKAGE_NCCL)
335 else if (m_synchronizer_version == 6){
336 if (do_print)
337 tm->info() << "Using NCCLSynchronizer";
338 generic_factory = arcaneCreateNCCLVariableSynchronizerFactory(mpi_pm);
339 }
340#endif
341 else{
342 if (do_print)
343 tm->info() << "Using MpiSynchronizer V1";
344 generic_factory = arcaneCreateMpiLegacyVariableSynchronizerFactory(mpi_pm);
345 }
346 if (!generic_factory.get())
347 ARCANE_FATAL("No factory created");
348 return createRef<MpiVariableSynchronizer>(pm,group,generic_factory,topology_info);
349 }
350
351 private:
352
353 Integer m_synchronizer_version = 1;
354 Int32 m_synchronize_block_size = 32000;
355 Int32 m_synchronize_nb_sequence = 1;
356};
357
358/*---------------------------------------------------------------------------*/
359/*---------------------------------------------------------------------------*/
360
361/*---------------------------------------------------------------------------*/
362/*---------------------------------------------------------------------------*/
363
365: public ParallelMngInternal
366{
367 public:
368
369 explicit Impl(MpiParallelMng* pm)
370 : ParallelMngInternal(pm)
371 , m_parallel_mng(pm)
372 , m_alloc(makeRef(new MachineShMemWinMemoryAllocator(pm)))
373 {}
374
375 ~Impl() override = default;
376
377 public:
378
379 Int32 masterParallelIORank() const override { return m_parallel_mng->commRank(); }
380 Int32 nbSendersToMasterParallelIO() const override { return 1; }
381
383 {
384 m_parallel_mng->traceMng()->info() << "initializeWindowCreator() MPI";
385 m_parallel_mng->adapter()->initializeWindowCreator(m_parallel_mng->machineCommunicator());
386 }
387
389 {
390 if (m_shmem_available == 1) {
391 return true;
392 }
393
394 if (m_shmem_available == 0) {
395 Ref<IParallelTopology> topo = m_parallel_mng->_internalUtilsFactory()->createTopology(m_parallel_mng);
396 if (topo->machineRanks().size() == m_parallel_mng->adapter()->windowCreator()->machineRanks().size()) {
397 m_shmem_available = 1;
398 return true;
399 }
400 // Problème avec MPI. Peut intervenir si MPICH est compilé en mode ch3:sock.
401 m_shmem_available = 2;
402 return false;
403 }
404
405 return false;
406 }
407
409 {
410 return makeRef(m_parallel_mng->adapter()->windowCreator()->createWindow(sizeof_segment, sizeof_type));
411 }
412
414 {
415 return makeRef(m_parallel_mng->adapter()->windowCreator()->createDynamicWindow(sizeof_segment, sizeof_type));
416 }
417
419 {
420 return MemoryAllocationOptions{ m_alloc.get() };
421 }
422
424 {
425 return m_parallel_mng->adapter()->windowCreator()->machineRanks();
426 }
427
428 void machineBarrier() override
429 {
430 m_parallel_mng->adapter()->windowCreator()->machineBarrier();
431 }
432
433 private:
434
435 MpiParallelMng* m_parallel_mng;
437
438 // 0 = Attribut non initialisé
439 // 1 = Mémoire partagée dispo
440 // 2 = Mémoire partagée non dispo
441 Int8 m_shmem_available = 0;
442};
443
444/*---------------------------------------------------------------------------*/
445/*---------------------------------------------------------------------------*/
446
447/*---------------------------------------------------------------------------*/
448/*---------------------------------------------------------------------------*/
449
450MpiParallelMng::
451MpiParallelMng(const MpiParallelMngBuildInfo& bi)
452: ParallelMngDispatcher(ParallelMngDispatcherBuildInfo(bi.dispatchersRef(),bi.messagePassingMngRef()))
453, m_trace(bi.trace_mng)
454, m_thread_mng(bi.thread_mng)
455, m_world_parallel_mng(bi.world_parallel_mng)
456, m_timer_mng(bi.timer_mng)
457, m_replication(new ParallelReplication())
458, m_is_parallel(bi.is_parallel)
459, m_comm_rank(bi.commRank())
460, m_comm_size(bi.commSize())
461, m_stat(bi.stat)
462, m_communicator(bi.mpiComm())
463, m_machine_communicator(bi.mpiMachineComm())
464, m_is_communicator_owned(bi.is_mpi_comm_owned)
465, m_mpi_lock(bi.mpi_lock)
466, m_non_blocking_collective(nullptr)
467, m_utils_factory(createRef<MpiParallelMngUtilsFactory>())
468, m_parallel_mng_internal(new Impl(this))
469{
470 if (!m_world_parallel_mng){
471 m_trace->debug()<<"[MpiParallelMng] No m_world_parallel_mng found, reverting to ourselves!";
472 m_world_parallel_mng = this;
473 }
474}
475
476/*---------------------------------------------------------------------------*/
477/*---------------------------------------------------------------------------*/
478
479MpiParallelMng::
480~MpiParallelMng()
481{
482 delete m_parallel_mng_internal;
483 delete m_non_blocking_collective;
484 m_sequential_parallel_mng.reset();
485 if (m_is_communicator_owned){
486 MpiLock::Section ls(m_mpi_lock);
487 MPI_Comm_free(&m_communicator);
488 MPI_Comm_free(&m_machine_communicator);
489 }
490 delete m_replication;
491 delete m_io_mng;
492 if (m_is_timer_owned)
493 delete m_timer_mng;
494 arcaneCallFunctionAndTerminateIfThrow([&]() { m_adapter->destroy(); });
495 delete m_datatype_list;
496}
497
498/*---------------------------------------------------------------------------*/
499/*---------------------------------------------------------------------------*/
500
501namespace
502{
503
504/*---------------------------------------------------------------------------*/
505/*---------------------------------------------------------------------------*/
506// Classe pour créer les différents dispatchers
507class DispatchCreator
508{
509 public:
510 DispatchCreator(ITraceMng* tm,IMessagePassingMng* mpm,MpiAdapter* adapter,MpiDatatypeList* datatype_list)
511 : m_tm(tm), m_mpm(mpm), m_adapter(adapter), m_datatype_list(datatype_list){}
512 public:
513 template<typename DataType> MpiParallelDispatchT<DataType>*
514 create()
515 {
516 MpiDatatype* dt = m_datatype_list->datatype(DataType());
517 return new MpiParallelDispatchT<DataType>(m_tm,m_mpm,m_adapter,dt);
518 }
519
520 ITraceMng* m_tm;
521 IMessagePassingMng* m_mpm;
522 MpiAdapter* m_adapter;
523 MpiDatatypeList* m_datatype_list;
524};
525
526/*---------------------------------------------------------------------------*/
527/*---------------------------------------------------------------------------*/
528
529class ControlDispatcherDecorator
530: public ParallelMngDispatcher::DefaultControlDispatcher
531{
532 public:
533
534 ControlDispatcherDecorator(IParallelMng* pm, MpiAdapter* adapter)
535 : ParallelMngDispatcher::DefaultControlDispatcher(pm), m_adapter(adapter) {}
536
537 IMessagePassingMng* commSplit(bool keep) override
538 {
539 return m_adapter->commSplit(keep);
540 }
541 MP::IProfiler* profiler() const override { return m_adapter->profiler(); }
542 void setProfiler(MP::IProfiler* p) override { m_adapter->setProfiler(p); }
543
544 private:
545 MpiAdapter* m_adapter;
546};
547}
548
549/*---------------------------------------------------------------------------*/
550/*---------------------------------------------------------------------------*/
551
553build()
554{
555 ITraceMng* tm = traceMng();
556 if (!m_timer_mng){
557 m_timer_mng = new MpiTimerMng(tm);
558 m_is_timer_owned = true;
559 }
560
561 // Créé le gestionnaire séquentiel associé.
562 {
564 bi.setTraceMng(traceMng());
565 bi.setCommunicator(communicator());
566 bi.setThreadMng(threadMng());
567 m_sequential_parallel_mng = arcaneCreateSequentialParallelMngRef(bi);
568 }
569
570 // Indique que les reduces doivent être fait dans l'ordre des processeurs
571 // afin de garantir une exécution déterministe
572 bool is_ordered_reduce = false;
573 if (platform::getEnvironmentVariable("ARCANE_ORDERED_REDUCE")=="TRUE")
574 is_ordered_reduce = true;
575 m_datatype_list = new MpiDatatypeList(is_ordered_reduce);
576
577 ARCANE_CHECK_POINTER(m_stat);
578
579 MpiAdapter* adapter = new MpiAdapter(m_trace,m_stat->toArccoreStat(),m_communicator,m_mpi_lock);
580 m_adapter = adapter;
581 auto mpm = _messagePassingMng();
582
583 // NOTE: cette instance sera détruite par le ParallelMngDispatcher
584 auto* control_dispatcher = new ControlDispatcherDecorator(this,m_adapter);
585 _setControlDispatcher(control_dispatcher);
586
587 // NOTE: cette instance sera détruite par le ParallelMngDispatcher
588 auto* serialize_dispatcher = new MpiSerializeDispatcher(m_adapter, mpm);
589 m_mpi_serialize_dispatcher = serialize_dispatcher;
590 _setSerializeDispatcher(serialize_dispatcher);
591
592 DispatchCreator creator(m_trace,mpm,m_adapter,m_datatype_list);
593 this->createDispatchers(creator);
594
595 m_io_mng = arcaneCreateIOMng(this);
596
597 m_non_blocking_collective = new MpiParallelNonBlockingCollective(tm,this,adapter);
598 m_non_blocking_collective->build();
599 if (m_mpi_lock)
600 m_trace->info() << "Using mpi with locks.";
601
602 m_parallel_mng_internal->initializeWindowCreator();
603}
604
605/*---------------------------------------------------------------------------*/
606/*---------------------------------------------------------------------------*/
607
608/*----------------------------------------------------------------------------*/
609/*---------------------------------------------------------------------------*/
610
613{
614 Trace::Setter mci(m_trace,"Mpi");
615 if (m_is_initialized){
616 m_trace->warning() << "MpiParallelMng already initialized";
617 return;
618 }
619
620 m_trace->info() << "Initialisation de MpiParallelMng";
621 m_sequential_parallel_mng->initialize();
622
623 m_adapter->setTimeMetricCollector(timeMetricCollector());
624
625 m_is_initialized = true;
626}
627
628/*---------------------------------------------------------------------------*/
629/*---------------------------------------------------------------------------*/
630
631void MpiParallelMng::
632sendSerializer(ISerializer* s,Int32 rank)
633{
634 Trace::Setter mci(m_trace,"Mpi");
635 Timer::Phase tphase(timeStats(),TP_Communication);
637 m_mpi_serialize_dispatcher->legacySendSerializer(s,{MessageRank(rank),mpi_tag,Blocking});
638}
639
640/*---------------------------------------------------------------------------*/
641/*---------------------------------------------------------------------------*/
642
645{
646 return m_utils_factory->createSendSerializeMessage(this, rank)._release();
647}
648
649/*---------------------------------------------------------------------------*/
650/*---------------------------------------------------------------------------*/
651
652Request MpiParallelMng::
653sendSerializer(ISerializer* s,Int32 rank,[[maybe_unused]] ByteArray& bytes)
654{
655 Trace::Setter mci(m_trace,"Mpi");
656 Timer::Phase tphase(timeStats(),TP_Communication);
658 return m_mpi_serialize_dispatcher->legacySendSerializer(s,{MessageRank(rank),mpi_tag,NonBlocking});
659}
660
661/*---------------------------------------------------------------------------*/
662/*---------------------------------------------------------------------------*/
663
664void MpiParallelMng::
665broadcastSerializer(ISerializer* values,Int32 rank)
666{
667 Timer::Phase tphase(timeStats(),TP_Communication);
668 m_mpi_serialize_dispatcher->broadcastSerializer(values,MessageRank(rank));
669}
670
671/*---------------------------------------------------------------------------*/
672/*---------------------------------------------------------------------------*/
673
674void MpiParallelMng::
675recvSerializer(ISerializer* values,Int32 rank)
676{
677 Trace::Setter mci(m_trace,"Mpi");
678 Timer::Phase tphase(timeStats(),TP_Communication);
680 m_mpi_serialize_dispatcher->legacyReceiveSerializer(values,MessageRank(rank),mpi_tag);
681}
682
683/*---------------------------------------------------------------------------*/
684/*---------------------------------------------------------------------------*/
685
688{
689 return m_utils_factory->createReceiveSerializeMessage(this, rank)._release();
690}
691
692/*---------------------------------------------------------------------------*/
693/*---------------------------------------------------------------------------*/
694
696probe(const PointToPointMessageInfo& message) -> MessageId
697{
698 return m_adapter->probeMessage(message);
699}
700
701/*---------------------------------------------------------------------------*/
702/*---------------------------------------------------------------------------*/
703
706{
707 return m_adapter->legacyProbeMessage(message);
708}
709
710/*---------------------------------------------------------------------------*/
711/*---------------------------------------------------------------------------*/
712
713Request MpiParallelMng::
714sendSerializer(const ISerializer* s,const PointToPointMessageInfo& message)
715{
716 return m_mpi_serialize_dispatcher->sendSerializer(s,message);
717}
718
719/*---------------------------------------------------------------------------*/
720/*---------------------------------------------------------------------------*/
721
722Request MpiParallelMng::
723receiveSerializer(ISerializer* s,const PointToPointMessageInfo& message)
724{
725 return m_mpi_serialize_dispatcher->receiveSerializer(s,message);
726}
727
728/*---------------------------------------------------------------------------*/
729/*---------------------------------------------------------------------------*/
730
733{
734 for( Integer i=0, is=requests.size(); i<is; ++i )
735 m_adapter->freeRequest(requests[i]);
736}
737
738/*---------------------------------------------------------------------------*/
739/*---------------------------------------------------------------------------*/
740
741void MpiParallelMng::
742_checkFinishedSubRequests()
743{
744 m_mpi_serialize_dispatcher->checkFinishedSubRequests();
745}
746
747/*---------------------------------------------------------------------------*/
748/*---------------------------------------------------------------------------*/
749
750Ref<IParallelMng> MpiParallelMng::
751sequentialParallelMngRef()
752{
753 return m_sequential_parallel_mng;
754}
755
758{
759 return m_sequential_parallel_mng.get();
760}
761
762/*---------------------------------------------------------------------------*/
763/*---------------------------------------------------------------------------*/
764
767{
768 if (m_stat)
769 m_stat->print(m_trace);
770}
771
772/*---------------------------------------------------------------------------*/
773/*---------------------------------------------------------------------------*/
774
776barrier()
777{
778 traceMng()->flush();
779 m_adapter->barrier();
780}
781
782/*---------------------------------------------------------------------------*/
783/*---------------------------------------------------------------------------*/
784
787{
788 m_adapter->waitAllRequests(requests);
789 _checkFinishedSubRequests();
790}
791
792/*---------------------------------------------------------------------------*/
793/*---------------------------------------------------------------------------*/
794
797{
798 return _waitSomeRequests(requests, false);
799}
800
801/*---------------------------------------------------------------------------*/
802/*---------------------------------------------------------------------------*/
803
806{
807 return _waitSomeRequests(requests, true);
808}
809
810/*---------------------------------------------------------------------------*/
811/*---------------------------------------------------------------------------*/
812
813UniqueArray<Integer> MpiParallelMng::
814_waitSomeRequests(ArrayView<Request> requests, bool is_non_blocking)
815{
816 UniqueArray<Integer> results;
817 UniqueArray<bool> done_indexes(requests.size());
818
819 m_adapter->waitSomeRequests(requests, done_indexes, is_non_blocking);
820 for (int i = 0 ; i < requests.size() ; i++) {
821 if (done_indexes[i])
822 results.add(i);
823 }
824 return results;
825}
826
827/*---------------------------------------------------------------------------*/
828/*---------------------------------------------------------------------------*/
829
830ISerializeMessageList* MpiParallelMng::
831_createSerializeMessageList()
832{
833 return new MP::internal::SerializeMessageList(messagePassingMng());
834}
835
836/*---------------------------------------------------------------------------*/
837/*---------------------------------------------------------------------------*/
838
841{
842 return m_utils_factory->createGetVariablesValuesOperation(this)._release();
843}
844
845/*---------------------------------------------------------------------------*/
846/*---------------------------------------------------------------------------*/
847
850{
851 return m_utils_factory->createTransferValuesOperation(this)._release();
852}
853
854/*---------------------------------------------------------------------------*/
855/*---------------------------------------------------------------------------*/
856
859{
860 return m_utils_factory->createExchanger(this)._release();
861}
862
863/*---------------------------------------------------------------------------*/
864/*---------------------------------------------------------------------------*/
865
868{
869 return m_utils_factory->createSynchronizer(this,family)._release();
870}
871
872/*---------------------------------------------------------------------------*/
873/*---------------------------------------------------------------------------*/
874
876createSynchronizer(const ItemGroup& group)
877{
878 return m_utils_factory->createSynchronizer(this,group)._release();
879}
880
881/*---------------------------------------------------------------------------*/
882/*---------------------------------------------------------------------------*/
883
886{
887 return m_utils_factory->createTopology(this)._release();
888}
889
890/*---------------------------------------------------------------------------*/
891/*---------------------------------------------------------------------------*/
892
894replication() const
895{
896 return m_replication;
897}
898
899/*---------------------------------------------------------------------------*/
900/*---------------------------------------------------------------------------*/
901
904{
905 delete m_replication;
906 m_replication = v;
907}
908
909/*---------------------------------------------------------------------------*/
910/*---------------------------------------------------------------------------*/
911
912IParallelMng* MpiParallelMng::
913_createSubParallelMng(MPI_Comm sub_communicator)
914{
915 // Si nul, ce rang ne fait pas partie du sous-communicateur
916 if (sub_communicator==MPI_COMM_NULL)
917 return nullptr;
918
919 int sub_rank = -1;
920 MPI_Comm_rank(sub_communicator,&sub_rank);
921
922 MPI_Comm sub_machine_communicator = MPI_COMM_NULL;
923 MPI_Comm_split_type(sub_communicator, MPI_COMM_TYPE_SHARED, sub_rank, MPI_INFO_NULL, &sub_machine_communicator);
924
925 MpiParallelMngBuildInfo bi(sub_communicator, sub_machine_communicator);
926 bi.is_parallel = isParallel();
927 bi.stat = m_stat;
928 bi.timer_mng = m_timer_mng;
929 bi.thread_mng = m_thread_mng;
930 bi.trace_mng = m_trace;
931 bi.world_parallel_mng = m_world_parallel_mng;
932 bi.mpi_lock = m_mpi_lock;
933
934 IParallelMng* sub_pm = new MpiParallelMng(bi);
935 sub_pm->build();
936 return sub_pm;
937}
938
939/*---------------------------------------------------------------------------*/
940/*---------------------------------------------------------------------------*/
941
942Ref<IParallelMng> MpiParallelMng::
943_createSubParallelMngRef(Int32 color, Int32 key)
944{
945 if (color < 0)
946 color = MPI_UNDEFINED;
947 MPI_Comm sub_communicator = MPI_COMM_NULL;
948 MPI_Comm_split(m_communicator, color, key, &sub_communicator);
949 IParallelMng* sub_pm = _createSubParallelMng(sub_communicator);
950 return makeRef(sub_pm);
951}
952
953/*---------------------------------------------------------------------------*/
954/*---------------------------------------------------------------------------*/
955
956IParallelMng* MpiParallelMng::
957_createSubParallelMng(Int32ConstArrayView kept_ranks)
958{
959 MPI_Group mpi_group = MPI_GROUP_NULL;
960 MPI_Comm_group(m_communicator, &mpi_group);
961 Integer nb_sub_rank = kept_ranks.size();
962 UniqueArray<int> mpi_kept_ranks(nb_sub_rank);
963 for (Integer i = 0; i < nb_sub_rank; ++i)
964 mpi_kept_ranks[i] = (int)kept_ranks[i];
965
966 MPI_Group final_group = MPI_GROUP_NULL;
967 MPI_Group_incl(mpi_group, nb_sub_rank, mpi_kept_ranks.data(), &final_group);
968 MPI_Comm sub_communicator = MPI_COMM_NULL;
969
970 MPI_Comm_create(m_communicator, final_group, &sub_communicator);
971 MPI_Group_free(&final_group);
972 return _createSubParallelMng(sub_communicator);
973}
974
975/*---------------------------------------------------------------------------*/
976/*---------------------------------------------------------------------------*/
985: public MpiRequestList
986{
987 using Base = MpiRequestList;
988 public:
989 explicit RequestList(MpiParallelMng* pm)
990 : Base(pm->m_adapter), m_parallel_mng(pm){}
991 public:
992 void _wait(Parallel::eWaitType wait_type) override
993 {
994 Base::_wait(wait_type);
995 m_parallel_mng->_checkFinishedSubRequests();
996 };
997 private:
998 MpiParallelMng* m_parallel_mng;
999};
1000
1001/*---------------------------------------------------------------------------*/
1002/*---------------------------------------------------------------------------*/
1003
1009
1010/*---------------------------------------------------------------------------*/
1011/*---------------------------------------------------------------------------*/
1012
1015{
1016 return m_utils_factory;
1017}
1018
1019/*---------------------------------------------------------------------------*/
1020/*---------------------------------------------------------------------------*/
1021
1022bool MpiParallelMng::
1023_isAcceleratorAware() const
1024{
1026}
1027
1028/*---------------------------------------------------------------------------*/
1029/*---------------------------------------------------------------------------*/
1030
1031} // End namespace Arcane
1032
1033/*---------------------------------------------------------------------------*/
1034/*---------------------------------------------------------------------------*/
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Liste des fonctions d'échange de message.
Vue modifiable d'un tableau d'un type T.
constexpr Integer size() const noexcept
Retourne la taille du tableau.
const T * data() const
Accès à la racine du tableau hors toute protection.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
Vue constante d'un tableau de type T.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
Opérations pour accéder aux valeurs de variables d'un autre sous-domaine.
Interface du gestionnaire des entrées sorties.
Definition IIOMng.h:36
Interface d'une famille d'entités.
Definition IItemFamily.h:84
virtual ItemGroup allItems() const =0
Groupe de toutes les entités.
Échange d'informations entre processeurs.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual ITraceMng * traceMng() const =0
Gestionnaire de traces.
virtual void build()=0
Construit l'instance.
Informations sur la réplication des sous-domaines en parallèle.
Informations sur la topologie d'allocation des coeurs de calcul.
Interface du gestionnaire de traces.
virtual void flush()=0
Flush tous les flots.
virtual TraceMessage info()=0
Flot pour un message d'information.
Envoie de valeurs sur différents processeurs.
Interface d'un communicateur MPI spécifique pour les synchronisations.
Interface d'un service de synchronisation de variable.
Groupe d'entités de maillage.
Definition ItemGroup.h:49
eItemKind itemKind() const
Genre du groupe. Il s'agit du genre de ses éléments.
Definition ItemGroup.h:109
bool isAllItems() const
Indique si le groupe est celui de toutes les entités.
Definition ItemGroup.cc:610
Interface d'un message de sérialisation entre IMessagePassingMng.
Informations sur la source d'un message.
void _wait(eWaitType wait_type) override
Effectue l'attente ou le test.
Request receiveSerializer(ISerializer *s, const PointToPointMessageInfo &message) override
Message de réception.
Request sendSerializer(const ISerializer *s, const PointToPointMessageInfo &message) override
Message d'envoi.
Informations pour envoyer/recevoir un message point à point.
Requête d'un message.
Definition Request.h:77
Message de sérialisation utilisant un BasicSerializer.
static MessageTag defaultTag()
Tag par défaut pour les messages de sérialisation.
Gère les MPI_Datatype associées aux types Arcane.
Ref< IVariableSynchronizer > createSynchronizer(IParallelMng *pm, IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
Ref< IVariableSynchronizer > createSynchronizer(IParallelMng *pm, const ItemGroup &group) override
Retourne une interface pour synchroniser des variables sur le groupe group.
void initializeWindowCreator() override
Méthode permettant d'initialiser le windowCreator spécifique à l'implémentation.
Ref< IContigMachineShMemWinBaseInternal > createContigMachineShMemWinBase(Int64 sizeof_segment, Int32 sizeof_type) override
Méthode permettant de créer une fenêtre mémoire sur le noeud.
Ref< IMachineShMemWinBaseInternal > createMachineShMemWinBase(Int64 sizeof_segment, Int32 sizeof_type) override
Méthode permettant de créer une fenêtre mémoire dynamique sur le noeud.
bool isMachineShMemWinAvailable() override
Méthode permettant de savoir si le mode mémoire partagée est supporté.
ConstArrayView< Int32 > machineRanks() override
Méthode permettant de récupérer les rangs des sous-domaines du noeud de calcul.
void machineBarrier() override
Méthode permettant de faire une barrière pour les sous-domaines du noeud de calcul.
Int32 masterParallelIORank() const override
Int32 nbSendersToMasterParallelIO() const override
MemoryAllocationOptions machineShMemWinMemoryAllocator() override
Méthode permettant de récupérer un allocateur en mémoire partagée.
Spécialisation de MpiRequestList pour MpiParallelMng.
void _wait(Parallel::eWaitType wait_type) override
Effectue l'attente ou le test.
Gestionnaire du parallélisme utilisant MPI.
IParallelMng * worldParallelMng() const override
Gestionnaire de parallélisme sur l'ensemble des ressources allouées.
MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
void barrier() override
Effectue une barière.
UniqueArray< Integer > waitSomeRequests(ArrayView< Request > requests) override
Bloque en attendant qu'une des requêtes rvalues soit terminée.
void waitAllRequests(ArrayView< Request > requests) override
Bloque en attendant que les requêtes rvalues soient terminées.
MessageId probe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
void build() override
Construit l'instance.
void printStats() override
Affiche des statistiques liées à ce gestionnaire du parallélisme.
bool m_is_initialized
true si déjà initialisé
IParallelMng * sequentialParallelMng() override
Retourne un gestionnaire de parallélisme séquentiel.
IThreadMng * threadMng() const override
Gestionnaire de threads.
ITimerMng * timerMng() const override
Gestionnaire de timers.
void initialize() override
Initialise le gestionnaire du parallélisme.
IVariableSynchronizer * createSynchronizer(IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
ISerializeMessage * createSendSerializer(Int32 rank) override
Créé un message non bloquant pour envoyer des données sérialisées au rang rank.
bool isParallel() const override
Retourne true si l'exécution est parallèle.
Ref< IParallelMngUtilsFactory > _internalUtilsFactory() const override
Fabrique des fonctions utilitaires.
ITraceMng * traceMng() const override
Gestionnaire de traces.
IParallelTopology * createTopology() override
Créé une instance contenant les infos sur la topologie des rangs de ce gestionnnaire.
Communicator communicator() const override
Communicateur MPI associé à ce gestionnaire.
IParallelExchanger * createExchanger() override
Retourne une interface pour transférer des messages entre processeurs.
IParallelReplication * replication() const override
Informations sur la réplication.
void setReplication(IParallelReplication *v) override
Positionne les Informations sur la réplication.
Ref< Parallel::IRequestList > createRequestListRef() override
Créé une liste de requêtes pour ce gestionnaire.
ITransferValuesParallelOperation * createTransferValuesOperation() override
Retourne une opération pour transférer des valeurs entre sous-domaine.
UniqueArray< Integer > testSomeRequests(ArrayView< Request > requests) override
Test si une des requêtes rvalues est terminée.
IGetVariablesValuesParallelOperation * createGetVariablesValuesOperation() override
Retourne une opération pour récupérer les valeurs d'une variable sur les entités d'un autre sous-doma...
ISerializeMessage * createReceiveSerializer(Int32 rank) override
Créé un message non bloquant pour recevoir des données sérialisées du rang rank.
void freeRequests(ArrayView< Parallel::Request > requests) override
Libère les requêtes.
Gestionnaire du parallélisme utilisant MPI.
Gestionnaire de timer utisant la bibliothèque MPI.
Definition MpiTimerMng.h:39
void compute() override
Recalcule les infos de synchronisation.
Redirige la gestion des messages des sous-domaines suivant le type de l'argument.
IMessagePassingMng * messagePassingMng() const override
Gestionnaire de message de Arccore associé
ITimeMetricCollector * timeMetricCollector() const override
Collecteur Arccore des statistiques temporelles (peut être nul)
ITimeStats * timeStats() const override
Gestionnaire de statistiques associé (peut être nul)
Classe de base d'une fabrique pour les fonctions utilitaires de IParallelMng.
Informations sur la réplication des sous-domaines en parallèle.
Référence à une instance.
InstanceType * get() const
Instance associée ou nullptr si aucune.
Chaîne de caractères unicode.
bool null() const
Retourne true si la chaîne est nulle.
Definition String.cc:305
Positionne la phase de l'action en cours d'exécution.
Definition Timer.h:128
Positionne une classe de message.
Vecteur 1D de données avec sémantique par valeur (style STL).
MPI_Comm communicator() const override
Récupère le communicateur spécifique de la topologie.
void compute(VariableSynchronizer *var_syncer) override
Calcul le communicateur spécifique.
Interface d'un service de synchronisation de variable.
void compute() override
Création de la liste des éléments de synchronisation.
Int32ConstArrayView communicatingRanks() override
Rangs des sous-domaines avec lesquels on communique.
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
ARCCORE_BASE_EXPORT String getEnvironmentVariable(const String &name)
Variable d'environnement du nom name.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
std::int8_t Int8
Type entier signé sur 8 bits.
Ref< TrueType > createRef(Args &&... args)
Créé une instance de type TrueType avec les arguments Args et retourne une référence dessus.
ARCANE_MPI_EXPORT bool arcaneIsAcceleratorAwareMPI()
Indique si le runtime actuel de MPI a le support des accélérateurs.
Definition ArcaneMpi.cc:85
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
Array< Byte > ByteArray
Tableau dynamique à une dimension de caractères.
Definition UtilsTypes.h:121
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:482
@ IK_Cell
Entité de maillage de genre maille.
void arcaneCallFunctionAndTerminateIfThrow(std::function< void()> function)
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int32_t Int32
Type entier signé sur 32 bits.
Infos pour construire un MpiParallelMng.
Infos pour construire un SequentialParallelMng.