Arcane  v4.1.4.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
MpiParallelMng.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2026 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* MpiParallelMng.cc (C) 2000-2026 */
9/* */
10/* Gestionnaire de parallélisme utilisant MPI. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/utils/Collection.h"
15#include "arcane/utils/Enumerator.h"
16#include "arcane/utils/ScopedPtr.h"
17#include "arcane/utils/PlatformUtils.h"
18#include "arcane/utils/TimeoutException.h"
19#include "arcane/utils/NotImplementedException.h"
20#include "arcane/utils/ArgumentException.h"
21#include "arcane/utils/ITraceMng.h"
22#include "arcane/utils/ValueConvert.h"
23#include "arcane/utils/Exception.h"
24#include "arcane/utils/HPReal.h"
25
26#include "arcane/core/IIOMng.h"
27#include "arcane/core/Timer.h"
28#include "arcane/core/IItemFamily.h"
29#include "arcane/core/parallel/IStat.h"
30#include "arcane/core/internal/SerializeMessage.h"
31#include "arcane/core/internal/ParallelMngInternal.h"
32#include "arcane/core/internal/DynamicMachineMemoryWindowMemoryAllocator.h"
33
34#include "arcane/parallel/mpi/MpiParallelMng.h"
35#include "arcane/parallel/mpi/MpiParallelDispatch.h"
36#include "arcane/parallel/mpi/MpiTimerMng.h"
37#include "arcane/parallel/mpi/MpiSerializeMessage.h"
38#include "arcane/parallel/mpi/MpiParallelNonBlockingCollective.h"
39#include "arcane/parallel/mpi/MpiDatatype.h"
40#include "arcane/parallel/mpi/IVariableSynchronizerMpiCommunicator.h"
41
42#include "arcane/impl/ParallelReplication.h"
43#include "arcane/impl/SequentialParallelMng.h"
44#include "arcane/impl/internal/ParallelMngUtilsFactoryBase.h"
45#include "arcane/impl/internal/VariableSynchronizer.h"
46
47#include "arccore/message_passing_mpi/MpiMessagePassingMng.h"
48#include "arccore/message_passing_mpi/internal/MpiSerializeDispatcher.h"
49#include "arccore/message_passing_mpi/internal/MpiRequestList.h"
50#include "arccore/message_passing_mpi/internal/MpiAdapter.h"
51#include "arccore/message_passing_mpi/internal/MpiLock.h"
52#include "arccore/message_passing_mpi/internal/MpiMachineMemoryWindowBaseInternalCreator.h"
53#include "arccore/message_passing_mpi/internal/MpiMachineMemoryWindowBaseInternal.h"
54#include "arccore/message_passing_mpi/internal/MpiDynamicMachineMemoryWindowBaseInternal.h"
55#include "arccore/message_passing/Dispatchers.h"
57#include "arccore/message_passing/internal/SerializeMessageList.h"
58
59#include "arcane_packages.h"
60
61//#define ARCANE_TRACE_MPI
62
63/*---------------------------------------------------------------------------*/
64/*---------------------------------------------------------------------------*/
65
66namespace Arcane
67{
68using namespace Arcane::MessagePassing;
69using namespace Arcane::MessagePassing::Mpi;
71
72/*---------------------------------------------------------------------------*/
73/*---------------------------------------------------------------------------*/
74
75extern "C++" IIOMng*
76arcaneCreateIOMng(IParallelMng* psm);
77
78#if defined(ARCANE_HAS_MPI_NEIGHBOR)
79// Défini dans MpiNeighborVariableSynchronizeDispatcher
81arcaneCreateMpiNeighborVariableSynchronizerFactory(MpiParallelMng* mpi_pm,
82 Ref<IVariableSynchronizerMpiCommunicator> synchronizer_communicator);
83#endif
85arcaneCreateMpiBlockVariableSynchronizerFactory(MpiParallelMng* mpi_pm, Int32 block_size, Int32 nb_sequence);
87arcaneCreateMpiVariableSynchronizerFactory(MpiParallelMng* mpi_pm);
89arcaneCreateMpiDirectSendrecvVariableSynchronizerFactory(MpiParallelMng* mpi_pm);
91arcaneCreateMpiLegacyVariableSynchronizerFactory(MpiParallelMng* mpi_pm);
92#if defined(ARCANE_HAS_PACKAGE_NCCL)
94arcaneCreateNCCLVariableSynchronizerFactory(IParallelMng* mpi_pm);
95#endif
96
97/*---------------------------------------------------------------------------*/
98/*---------------------------------------------------------------------------*/
99
100MpiParallelMngBuildInfo::
101MpiParallelMngBuildInfo(MPI_Comm comm, MPI_Comm machine_comm)
102: is_parallel(false)
103, comm_rank(MessagePassing::A_NULL_RANK)
104, comm_nb_rank(0)
105, stat(nullptr)
106, trace_mng(nullptr)
107, timer_mng(nullptr)
108, thread_mng(nullptr)
109, mpi_comm(comm)
110, mpi_machine_comm(machine_comm)
111, is_mpi_comm_owned(true)
112, mpi_lock(nullptr)
113{
114 ::MPI_Comm_rank(comm,&comm_rank);
115 ::MPI_Comm_size(comm,&comm_nb_rank);
116
117 m_dispatchers_ref = createRef<MP::Dispatchers>();
118 MP::Mpi::MpiMessagePassingMng::BuildInfo bi(comm_rank,comm_nb_rank,m_dispatchers_ref.get(),mpi_comm);
119
120 m_message_passing_mng_ref = createRef<MP::Mpi::MpiMessagePassingMng>(bi);
121}
122
123/*---------------------------------------------------------------------------*/
124/*---------------------------------------------------------------------------*/
125
126/*---------------------------------------------------------------------------*/
127/*---------------------------------------------------------------------------*/
131class VariableSynchronizerMpiCommunicator
133{
134 public:
135 explicit VariableSynchronizerMpiCommunicator(MpiParallelMng* pm)
136 : m_mpi_parallel_mng(pm){}
137 ~VariableSynchronizerMpiCommunicator() override
138 {
139 _checkFreeCommunicator();
140 }
141 MPI_Comm communicator() const override
142 {
143 return m_topology_communicator;
144 }
145 void compute(VariableSynchronizer* var_syncer) override
146 {
147 Int32ConstArrayView comm_ranks = var_syncer->communicatingRanks();
148 const Int32 nb_message = comm_ranks.size();
149
150 MpiParallelMng* pm = m_mpi_parallel_mng;
151
152 MPI_Comm old_comm = pm->communicator();
153
154 UniqueArray<int> destinations(nb_message);
155 for( Integer i=0; i<nb_message; ++i ){
156 destinations[i] = comm_ranks[i];
157 }
158
159 _checkFreeCommunicator();
160
161 int r = MPI_Dist_graph_create_adjacent(old_comm, nb_message, destinations.data(), MPI_UNWEIGHTED,
162 nb_message, destinations.data(), MPI_UNWEIGHTED,
163 MPI_INFO_NULL, 0, &m_topology_communicator);
164
165 if (r!=MPI_SUCCESS)
166 ARCANE_FATAL("Error '{0}' in MPI_Dist_graph_create",r);
167
168 // Vérifie que l'ordre des rangs pour l'implémentation MPI est le même que celui qu'on a dans
169 // le VariableSynchronizer.
170 {
171 int indegree = 0;
172 int outdegree = 0;
173 int weighted = 0;
174 MPI_Dist_graph_neighbors_count(m_topology_communicator,&indegree,&outdegree,&weighted);
175
176 if (indegree!=nb_message)
177 ARCANE_FATAL("Bad value '{0}' for 'indegree' (expected={1})",indegree,nb_message);
178 if (outdegree!=nb_message)
179 ARCANE_FATAL("Bad value '{0}' for 'outdegree' (expected={1})",outdegree,nb_message);
180
181 UniqueArray<int> srcs(indegree);
182 UniqueArray<int> dsts(outdegree);
183
184 MPI_Dist_graph_neighbors(m_topology_communicator,indegree,srcs.data(),MPI_UNWEIGHTED,outdegree,dsts.data(),MPI_UNWEIGHTED);
185
186 for(int k=0; k<outdegree; ++k){
187 int x = dsts[k];
188 if (x!=comm_ranks[k])
189 ARCANE_FATAL("Invalid destination rank order k={0} v={1} expected={2}",k,x,comm_ranks[k]);
190 }
191
192 for(int k=0; k<indegree; ++k ){
193 int x = srcs[k];
194 if (x!=comm_ranks[k])
195 ARCANE_FATAL("Invalid source rank order k={0} v={1} expected={2}",k,x,comm_ranks[k]);
196 }
197 }
198 }
199
200 private:
201
202 MpiParallelMng* m_mpi_parallel_mng = nullptr;
203 MPI_Comm m_topology_communicator = MPI_COMM_NULL;
204
205 private:
206
207 void _checkFreeCommunicator()
208 {
209 if (m_topology_communicator!=MPI_COMM_NULL)
210 MPI_Comm_free(&m_topology_communicator);
211 m_topology_communicator = MPI_COMM_NULL;
212 }
213};
214
215/*---------------------------------------------------------------------------*/
216/*---------------------------------------------------------------------------*/
223class MpiVariableSynchronizer
224: public VariableSynchronizer
225{
226 public:
227 MpiVariableSynchronizer(IParallelMng* pm,const ItemGroup& group,
228 Ref<IDataSynchronizeImplementationFactory> implementation_factory,
230 : VariableSynchronizer(pm,group,implementation_factory)
231 , m_topology_info(topology_info)
232 {
233 }
234 public:
235 void compute() override
236 {
238 // Si non nul, calcule la topologie
239 if (m_topology_info.get())
240 m_topology_info->compute(this);
241 }
242 private:
244};
245
246/*---------------------------------------------------------------------------*/
247/*---------------------------------------------------------------------------*/
248
249class MpiParallelMngUtilsFactory
251{
252 public:
253 MpiParallelMngUtilsFactory()
254 : m_synchronizer_version(2)
255 {
256 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="1")
257 m_synchronizer_version = 1;
258 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="2")
259 m_synchronizer_version = 2;
260 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="3")
261 m_synchronizer_version = 3;
262 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="4"){
263 m_synchronizer_version = 4;
264 String v = platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_BLOCK_SIZE");
265 if (!v.null()){
266 Int32 block_size = 0;
267 if (!builtInGetValue(block_size,v))
268 m_synchronize_block_size = block_size;
269 m_synchronize_block_size = std::clamp(m_synchronize_block_size,0,1000000000);
270 }
271 v = platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_NB_SEQUENCE");
272 if (!v.null()){
273 Int32 nb_sequence = 0;
274 if (!builtInGetValue(nb_sequence,v))
275 m_synchronize_nb_sequence = nb_sequence;
276 m_synchronize_nb_sequence = std::clamp(m_synchronize_nb_sequence,1,1024*1024);
277 }
278 }
279 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="5")
280 m_synchronizer_version = 5;
281 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="6")
282 m_synchronizer_version = 6;
283 }
284 public:
285
287 {
288 return _createSynchronizer(pm,family->allItems());
289 }
290
292 {
293 return _createSynchronizer(pm,group);
294 }
295
296 private:
297
298 Ref<IVariableSynchronizer> _createSynchronizer(IParallelMng* pm,const ItemGroup& group)
299 {
301 MpiParallelMng* mpi_pm = ARCANE_CHECK_POINTER(dynamic_cast<MpiParallelMng*>(pm));
302 ITraceMng* tm = pm->traceMng();
304 // N'affiche les informations que pour le groupe de toutes les mailles pour éviter d'afficher
305 // plusieurs fois le même message.
306 bool do_print = (group.isAllItems() && group.itemKind()==IK_Cell);
307 if (m_synchronizer_version == 2){
308 if (do_print)
309 tm->info() << "Using MpiSynchronizer V2";
310 generic_factory = arcaneCreateMpiVariableSynchronizerFactory(mpi_pm);
311 }
312 else if (m_synchronizer_version == 3 ){
313 if (do_print)
314 tm->info() << "Using MpiSynchronizer V3";
315 generic_factory = arcaneCreateMpiDirectSendrecvVariableSynchronizerFactory(mpi_pm);
316 }
317 else if (m_synchronizer_version == 4){
318 if (do_print)
319 tm->info() << "Using MpiSynchronizer V4 block_size=" << m_synchronize_block_size
320 << " nb_sequence=" << m_synchronize_nb_sequence;
321 generic_factory = arcaneCreateMpiBlockVariableSynchronizerFactory(mpi_pm,m_synchronize_block_size,m_synchronize_nb_sequence);
322 }
323 else if (m_synchronizer_version == 5){
324 if (do_print)
325 tm->info() << "Using MpiSynchronizer V5";
326 topology_info = createRef<VariableSynchronizerMpiCommunicator>(mpi_pm);
327#if defined(ARCANE_HAS_MPI_NEIGHBOR)
328 generic_factory = arcaneCreateMpiNeighborVariableSynchronizerFactory(mpi_pm,topology_info);
329#else
330 throw NotSupportedException(A_FUNCINFO,"Synchronize implementation V5 is not supported with this version of MPI");
331#endif
332 }
333#if defined(ARCANE_HAS_PACKAGE_NCCL)
334 else if (m_synchronizer_version == 6){
335 if (do_print)
336 tm->info() << "Using NCCLSynchronizer";
337 generic_factory = arcaneCreateNCCLVariableSynchronizerFactory(mpi_pm);
338 }
339#endif
340 else{
341 if (do_print)
342 tm->info() << "Using MpiSynchronizer V1";
343 generic_factory = arcaneCreateMpiLegacyVariableSynchronizerFactory(mpi_pm);
344 }
345 if (!generic_factory.get())
346 ARCANE_FATAL("No factory created");
347 return createRef<MpiVariableSynchronizer>(pm,group,generic_factory,topology_info);
348 }
349
350 private:
351
352 Integer m_synchronizer_version = 1;
353 Int32 m_synchronize_block_size = 32000;
354 Int32 m_synchronize_nb_sequence = 1;
355};
356
357/*---------------------------------------------------------------------------*/
358/*---------------------------------------------------------------------------*/
359
360/*---------------------------------------------------------------------------*/
361/*---------------------------------------------------------------------------*/
362
364: public ParallelMngInternal
365{
366 public:
367
368 explicit Impl(MpiParallelMng* pm)
369 : ParallelMngInternal(pm)
370 , m_parallel_mng(pm)
372 {}
373
374 ~Impl() override = default;
375
376 public:
377
379 {
380 return makeRef(m_parallel_mng->adapter()->windowCreator(m_parallel_mng->machineCommunicator())->createWindow(sizeof_segment, sizeof_type));
381 }
382
384 {
385 return makeRef(m_parallel_mng->adapter()->windowCreator(m_parallel_mng->machineCommunicator())->createDynamicWindow(sizeof_segment, sizeof_type));
386 }
387
388 IMemoryAllocator* dynamicMachineMemoryWindowMemoryAllocator() override
389 {
390 return m_alloc.get();
391 }
392
393 private:
394
395 MpiParallelMng* m_parallel_mng;
397};
398
399/*---------------------------------------------------------------------------*/
400/*---------------------------------------------------------------------------*/
401
402/*---------------------------------------------------------------------------*/
403/*---------------------------------------------------------------------------*/
404
405MpiParallelMng::
406MpiParallelMng(const MpiParallelMngBuildInfo& bi)
407: ParallelMngDispatcher(ParallelMngDispatcherBuildInfo(bi.dispatchersRef(),bi.messagePassingMngRef()))
408, m_trace(bi.trace_mng)
409, m_thread_mng(bi.thread_mng)
410, m_world_parallel_mng(bi.world_parallel_mng)
411, m_timer_mng(bi.timer_mng)
412, m_replication(new ParallelReplication())
413, m_is_parallel(bi.is_parallel)
414, m_comm_rank(bi.commRank())
415, m_comm_size(bi.commSize())
416, m_stat(bi.stat)
417, m_communicator(bi.mpiComm())
418, m_machine_communicator(bi.mpiMachineComm())
419, m_is_communicator_owned(bi.is_mpi_comm_owned)
420, m_mpi_lock(bi.mpi_lock)
421, m_non_blocking_collective(nullptr)
422, m_utils_factory(createRef<MpiParallelMngUtilsFactory>())
423, m_parallel_mng_internal(new Impl(this))
424{
425 if (!m_world_parallel_mng){
426 m_trace->debug()<<"[MpiParallelMng] No m_world_parallel_mng found, reverting to ourselves!";
427 m_world_parallel_mng = this;
428 }
429}
430
431/*---------------------------------------------------------------------------*/
432/*---------------------------------------------------------------------------*/
433
434MpiParallelMng::
435~MpiParallelMng()
436{
437 delete m_parallel_mng_internal;
438 delete m_non_blocking_collective;
439 m_sequential_parallel_mng.reset();
440 if (m_is_communicator_owned){
441 MpiLock::Section ls(m_mpi_lock);
442 MPI_Comm_free(&m_communicator);
443 MPI_Comm_free(&m_machine_communicator);
444 }
445 delete m_replication;
446 delete m_io_mng;
447 if (m_is_timer_owned)
448 delete m_timer_mng;
449 arcaneCallFunctionAndTerminateIfThrow([&]() { m_adapter->destroy(); });
450 delete m_datatype_list;
451}
452
453/*---------------------------------------------------------------------------*/
454/*---------------------------------------------------------------------------*/
455
456namespace
457{
458
459/*---------------------------------------------------------------------------*/
460/*---------------------------------------------------------------------------*/
461// Classe pour créer les différents dispatchers
462class DispatchCreator
463{
464 public:
465 DispatchCreator(ITraceMng* tm,IMessagePassingMng* mpm,MpiAdapter* adapter,MpiDatatypeList* datatype_list)
466 : m_tm(tm), m_mpm(mpm), m_adapter(adapter), m_datatype_list(datatype_list){}
467 public:
468 template<typename DataType> MpiParallelDispatchT<DataType>*
469 create()
470 {
471 MpiDatatype* dt = m_datatype_list->datatype(DataType());
472 return new MpiParallelDispatchT<DataType>(m_tm,m_mpm,m_adapter,dt);
473 }
474
475 ITraceMng* m_tm;
476 IMessagePassingMng* m_mpm;
477 MpiAdapter* m_adapter;
478 MpiDatatypeList* m_datatype_list;
479};
480
481/*---------------------------------------------------------------------------*/
482/*---------------------------------------------------------------------------*/
483
484class ControlDispatcherDecorator
485: public ParallelMngDispatcher::DefaultControlDispatcher
486{
487 public:
488
489 ControlDispatcherDecorator(IParallelMng* pm, MpiAdapter* adapter)
490 : ParallelMngDispatcher::DefaultControlDispatcher(pm), m_adapter(adapter) {}
491
492 IMessagePassingMng* commSplit(bool keep) override
493 {
494 return m_adapter->commSplit(keep);
495 }
496 MP::IProfiler* profiler() const override { return m_adapter->profiler(); }
497 void setProfiler(MP::IProfiler* p) override { m_adapter->setProfiler(p); }
498
499 private:
500 MpiAdapter* m_adapter;
501};
502}
503
504/*---------------------------------------------------------------------------*/
505/*---------------------------------------------------------------------------*/
506
508build()
509{
510 ITraceMng* tm = traceMng();
511 if (!m_timer_mng){
512 m_timer_mng = new MpiTimerMng(tm);
513 m_is_timer_owned = true;
514 }
515
516 // Créé le gestionnaire séquentiel associé.
517 {
519 bi.setTraceMng(traceMng());
520 bi.setCommunicator(communicator());
521 bi.setThreadMng(threadMng());
522 m_sequential_parallel_mng = arcaneCreateSequentialParallelMngRef(bi);
523 }
524
525 // Indique que les reduces doivent être fait dans l'ordre des processeurs
526 // afin de garantir une exécution déterministe
527 bool is_ordered_reduce = false;
528 if (platform::getEnvironmentVariable("ARCANE_ORDERED_REDUCE")=="TRUE")
529 is_ordered_reduce = true;
530 m_datatype_list = new MpiDatatypeList(is_ordered_reduce);
531
532 ARCANE_CHECK_POINTER(m_stat);
533
534 MpiAdapter* adapter = new MpiAdapter(m_trace,m_stat->toArccoreStat(),m_communicator,m_mpi_lock);
535 m_adapter = adapter;
536 auto mpm = _messagePassingMng();
537
538 // NOTE: cette instance sera détruite par le ParallelMngDispatcher
539 auto* control_dispatcher = new ControlDispatcherDecorator(this,m_adapter);
540 _setControlDispatcher(control_dispatcher);
541
542 // NOTE: cette instance sera détruite par le ParallelMngDispatcher
543 auto* serialize_dispatcher = new MpiSerializeDispatcher(m_adapter, mpm);
544 m_mpi_serialize_dispatcher = serialize_dispatcher;
545 _setSerializeDispatcher(serialize_dispatcher);
546
547 DispatchCreator creator(m_trace,mpm,m_adapter,m_datatype_list);
548 this->createDispatchers(creator);
549
550 m_io_mng = arcaneCreateIOMng(this);
551
552 m_non_blocking_collective = new MpiParallelNonBlockingCollective(tm,this,adapter);
553 m_non_blocking_collective->build();
554 if (m_mpi_lock)
555 m_trace->info() << "Using mpi with locks.";
556}
557
558/*---------------------------------------------------------------------------*/
559/*---------------------------------------------------------------------------*/
560
561/*----------------------------------------------------------------------------*/
562/*---------------------------------------------------------------------------*/
563
566{
567 Trace::Setter mci(m_trace,"Mpi");
568 if (m_is_initialized){
569 m_trace->warning() << "MpiParallelMng already initialized";
570 return;
571 }
572
573 m_trace->info() << "Initialisation de MpiParallelMng";
574 m_sequential_parallel_mng->initialize();
575
576 m_adapter->setTimeMetricCollector(timeMetricCollector());
577
578 m_is_initialized = true;
579}
580
581/*---------------------------------------------------------------------------*/
582/*---------------------------------------------------------------------------*/
583
584void MpiParallelMng::
585sendSerializer(ISerializer* s,Int32 rank)
586{
587 Trace::Setter mci(m_trace,"Mpi");
588 Timer::Phase tphase(timeStats(),TP_Communication);
590 m_mpi_serialize_dispatcher->legacySendSerializer(s,{MessageRank(rank),mpi_tag,Blocking});
591}
592
593/*---------------------------------------------------------------------------*/
594/*---------------------------------------------------------------------------*/
595
598{
599 return m_utils_factory->createSendSerializeMessage(this, rank)._release();
600}
601
602/*---------------------------------------------------------------------------*/
603/*---------------------------------------------------------------------------*/
604
605Request MpiParallelMng::
606sendSerializer(ISerializer* s,Int32 rank,[[maybe_unused]] ByteArray& bytes)
607{
608 Trace::Setter mci(m_trace,"Mpi");
609 Timer::Phase tphase(timeStats(),TP_Communication);
611 return m_mpi_serialize_dispatcher->legacySendSerializer(s,{MessageRank(rank),mpi_tag,NonBlocking});
612}
613
614/*---------------------------------------------------------------------------*/
615/*---------------------------------------------------------------------------*/
616
617void MpiParallelMng::
618broadcastSerializer(ISerializer* values,Int32 rank)
619{
620 Timer::Phase tphase(timeStats(),TP_Communication);
621 m_mpi_serialize_dispatcher->broadcastSerializer(values,MessageRank(rank));
622}
623
624/*---------------------------------------------------------------------------*/
625/*---------------------------------------------------------------------------*/
626
627void MpiParallelMng::
628recvSerializer(ISerializer* values,Int32 rank)
629{
630 Trace::Setter mci(m_trace,"Mpi");
631 Timer::Phase tphase(timeStats(),TP_Communication);
633 m_mpi_serialize_dispatcher->legacyReceiveSerializer(values,MessageRank(rank),mpi_tag);
634}
635
636/*---------------------------------------------------------------------------*/
637/*---------------------------------------------------------------------------*/
638
641{
642 return m_utils_factory->createReceiveSerializeMessage(this, rank)._release();
643}
644
645/*---------------------------------------------------------------------------*/
646/*---------------------------------------------------------------------------*/
647
649probe(const PointToPointMessageInfo& message) -> MessageId
650{
651 return m_adapter->probeMessage(message);
652}
653
654/*---------------------------------------------------------------------------*/
655/*---------------------------------------------------------------------------*/
656
659{
660 return m_adapter->legacyProbeMessage(message);
661}
662
663/*---------------------------------------------------------------------------*/
664/*---------------------------------------------------------------------------*/
665
666Request MpiParallelMng::
667sendSerializer(const ISerializer* s,const PointToPointMessageInfo& message)
668{
669 return m_mpi_serialize_dispatcher->sendSerializer(s,message);
670}
671
672/*---------------------------------------------------------------------------*/
673/*---------------------------------------------------------------------------*/
674
675Request MpiParallelMng::
676receiveSerializer(ISerializer* s,const PointToPointMessageInfo& message)
677{
678 return m_mpi_serialize_dispatcher->receiveSerializer(s,message);
679}
680
681/*---------------------------------------------------------------------------*/
682/*---------------------------------------------------------------------------*/
683
686{
687 for( Integer i=0, is=requests.size(); i<is; ++i )
688 m_adapter->freeRequest(requests[i]);
689}
690
691/*---------------------------------------------------------------------------*/
692/*---------------------------------------------------------------------------*/
693
694void MpiParallelMng::
695_checkFinishedSubRequests()
696{
697 m_mpi_serialize_dispatcher->checkFinishedSubRequests();
698}
699
700/*---------------------------------------------------------------------------*/
701/*---------------------------------------------------------------------------*/
702
703Ref<IParallelMng> MpiParallelMng::
704sequentialParallelMngRef()
705{
706 return m_sequential_parallel_mng;
707}
708
711{
712 return m_sequential_parallel_mng.get();
713}
714
715/*---------------------------------------------------------------------------*/
716/*---------------------------------------------------------------------------*/
717
720{
721 if (m_stat)
722 m_stat->print(m_trace);
723}
724
725/*---------------------------------------------------------------------------*/
726/*---------------------------------------------------------------------------*/
727
729barrier()
730{
731 traceMng()->flush();
732 m_adapter->barrier();
733}
734
735/*---------------------------------------------------------------------------*/
736/*---------------------------------------------------------------------------*/
737
740{
741 m_adapter->waitAllRequests(requests);
742 _checkFinishedSubRequests();
743}
744
745/*---------------------------------------------------------------------------*/
746/*---------------------------------------------------------------------------*/
747
750{
751 return _waitSomeRequests(requests, false);
752}
753
754/*---------------------------------------------------------------------------*/
755/*---------------------------------------------------------------------------*/
756
759{
760 return _waitSomeRequests(requests, true);
761}
762
763/*---------------------------------------------------------------------------*/
764/*---------------------------------------------------------------------------*/
765
766UniqueArray<Integer> MpiParallelMng::
767_waitSomeRequests(ArrayView<Request> requests, bool is_non_blocking)
768{
769 UniqueArray<Integer> results;
770 UniqueArray<bool> done_indexes(requests.size());
771
772 m_adapter->waitSomeRequests(requests, done_indexes, is_non_blocking);
773 for (int i = 0 ; i < requests.size() ; i++) {
774 if (done_indexes[i])
775 results.add(i);
776 }
777 return results;
778}
779
780/*---------------------------------------------------------------------------*/
781/*---------------------------------------------------------------------------*/
782
783ISerializeMessageList* MpiParallelMng::
784_createSerializeMessageList()
785{
786 return new MP::internal::SerializeMessageList(messagePassingMng());
787}
788
789/*---------------------------------------------------------------------------*/
790/*---------------------------------------------------------------------------*/
791
794{
795 return m_utils_factory->createGetVariablesValuesOperation(this)._release();
796}
797
798/*---------------------------------------------------------------------------*/
799/*---------------------------------------------------------------------------*/
800
803{
804 return m_utils_factory->createTransferValuesOperation(this)._release();
805}
806
807/*---------------------------------------------------------------------------*/
808/*---------------------------------------------------------------------------*/
809
812{
813 return m_utils_factory->createExchanger(this)._release();
814}
815
816/*---------------------------------------------------------------------------*/
817/*---------------------------------------------------------------------------*/
818
821{
822 return m_utils_factory->createSynchronizer(this,family)._release();
823}
824
825/*---------------------------------------------------------------------------*/
826/*---------------------------------------------------------------------------*/
827
829createSynchronizer(const ItemGroup& group)
830{
831 return m_utils_factory->createSynchronizer(this,group)._release();
832}
833
834/*---------------------------------------------------------------------------*/
835/*---------------------------------------------------------------------------*/
836
839{
840 return m_utils_factory->createTopology(this)._release();
841}
842
843/*---------------------------------------------------------------------------*/
844/*---------------------------------------------------------------------------*/
845
847replication() const
848{
849 return m_replication;
850}
851
852/*---------------------------------------------------------------------------*/
853/*---------------------------------------------------------------------------*/
854
857{
858 delete m_replication;
859 m_replication = v;
860}
861
862/*---------------------------------------------------------------------------*/
863/*---------------------------------------------------------------------------*/
864
865IParallelMng* MpiParallelMng::
866_createSubParallelMng(MPI_Comm sub_communicator)
867{
868 // Si nul, ce rang ne fait pas partie du sous-communicateur
869 if (sub_communicator==MPI_COMM_NULL)
870 return nullptr;
871
872 int sub_rank = -1;
873 MPI_Comm_rank(sub_communicator,&sub_rank);
874
875 MPI_Comm sub_machine_communicator = MPI_COMM_NULL;
876 MPI_Comm_split_type(sub_communicator, MPI_COMM_TYPE_SHARED, sub_rank, MPI_INFO_NULL, &sub_machine_communicator);
877
878 MpiParallelMngBuildInfo bi(sub_communicator, sub_machine_communicator);
879 bi.is_parallel = isParallel();
880 bi.stat = m_stat;
881 bi.timer_mng = m_timer_mng;
882 bi.thread_mng = m_thread_mng;
883 bi.trace_mng = m_trace;
884 bi.world_parallel_mng = m_world_parallel_mng;
885 bi.mpi_lock = m_mpi_lock;
886
887 IParallelMng* sub_pm = new MpiParallelMng(bi);
888 sub_pm->build();
889 return sub_pm;
890}
891
892/*---------------------------------------------------------------------------*/
893/*---------------------------------------------------------------------------*/
894
895Ref<IParallelMng> MpiParallelMng::
896_createSubParallelMngRef(Int32 color, Int32 key)
897{
898 if (color < 0)
899 color = MPI_UNDEFINED;
900 MPI_Comm sub_communicator = MPI_COMM_NULL;
901 MPI_Comm_split(m_communicator, color, key, &sub_communicator);
902 IParallelMng* sub_pm = _createSubParallelMng(sub_communicator);
903 return makeRef(sub_pm);
904}
905
906/*---------------------------------------------------------------------------*/
907/*---------------------------------------------------------------------------*/
908
909IParallelMng* MpiParallelMng::
910_createSubParallelMng(Int32ConstArrayView kept_ranks)
911{
912 MPI_Group mpi_group = MPI_GROUP_NULL;
913 MPI_Comm_group(m_communicator, &mpi_group);
914 Integer nb_sub_rank = kept_ranks.size();
915 UniqueArray<int> mpi_kept_ranks(nb_sub_rank);
916 for (Integer i = 0; i < nb_sub_rank; ++i)
917 mpi_kept_ranks[i] = (int)kept_ranks[i];
918
919 MPI_Group final_group = MPI_GROUP_NULL;
920 MPI_Group_incl(mpi_group, nb_sub_rank, mpi_kept_ranks.data(), &final_group);
921 MPI_Comm sub_communicator = MPI_COMM_NULL;
922
923 MPI_Comm_create(m_communicator, final_group, &sub_communicator);
924 MPI_Group_free(&final_group);
925 return _createSubParallelMng(sub_communicator);
926}
927
928/*---------------------------------------------------------------------------*/
929/*---------------------------------------------------------------------------*/
938: public MpiRequestList
939{
940 using Base = MpiRequestList;
941 public:
942 explicit RequestList(MpiParallelMng* pm)
943 : Base(pm->m_adapter), m_parallel_mng(pm){}
944 public:
945 void _wait(Parallel::eWaitType wait_type) override
946 {
947 Base::_wait(wait_type);
948 m_parallel_mng->_checkFinishedSubRequests();
949 };
950 private:
951 MpiParallelMng* m_parallel_mng;
952};
953
954/*---------------------------------------------------------------------------*/
955/*---------------------------------------------------------------------------*/
956
962
963/*---------------------------------------------------------------------------*/
964/*---------------------------------------------------------------------------*/
965
968{
969 return m_utils_factory;
970}
971
972/*---------------------------------------------------------------------------*/
973/*---------------------------------------------------------------------------*/
974
975bool MpiParallelMng::
976_isAcceleratorAware() const
977{
979}
980
981/*---------------------------------------------------------------------------*/
982/*---------------------------------------------------------------------------*/
983
984} // End namespace Arcane
985
986/*---------------------------------------------------------------------------*/
987/*---------------------------------------------------------------------------*/
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Liste des fonctions d'échange de message.
Vue modifiable d'un tableau d'un type T.
constexpr Integer size() const noexcept
Retourne la taille du tableau.
const T * data() const
Accès à la racine du tableau hors toute protection.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
Opérations pour accéder aux valeurs de variables d'un autre sous-domaine.
Interface du gestionnaire des entrées sorties.
Definition IIOMng.h:36
Interface d'une famille d'entités.
Definition IItemFamily.h:84
virtual ItemGroup allItems() const =0
Groupe de toutes les entités.
Interface d'un allocateur pour la mémoire.
Échange d'informations entre processeurs.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual ITraceMng * traceMng() const =0
Gestionnaire de traces.
virtual void build()=0
Construit l'instance.
Informations sur la réplication des sous-domaines en parallèle.
Informations sur la topologie d'allocation des coeurs de calcul.
Interface du gestionnaire de traces.
virtual void flush()=0
Flush tous les flots.
virtual TraceMessage info()=0
Flot pour un message d'information.
Envoie de valeurs sur différents processeurs.
Interface d'un communicateur MPI spécifique pour les synchronisations.
Interface d'un service de synchronisation de variable.
Groupe d'entités de maillage.
Definition ItemGroup.h:49
eItemKind itemKind() const
Genre du groupe. Il s'agit du genre de ses éléments.
Definition ItemGroup.h:109
bool isAllItems() const
Indique si le groupe est celui de toutes les entités.
Definition ItemGroup.cc:610
Interface d'un message de sérialisation entre IMessagePassingMng.
Informations sur la source d'un message.
void _wait(eWaitType wait_type) override
Effectue l'attente ou le test.
Request receiveSerializer(ISerializer *s, const PointToPointMessageInfo &message) override
Message de réception.
Request sendSerializer(const ISerializer *s, const PointToPointMessageInfo &message) override
Message d'envoi.
Informations pour envoyer/recevoir un message point à point.
Requête d'un message.
Definition Request.h:77
Message de sérialisation utilisant un BasicSerializer.
static MessageTag defaultTag()
Tag par défaut pour les messages de sérialisation.
Gère les MPI_Datatype associées aux types Arcane.
Ref< IVariableSynchronizer > createSynchronizer(IParallelMng *pm, IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
Ref< IVariableSynchronizer > createSynchronizer(IParallelMng *pm, const ItemGroup &group) override
Retourne une interface pour synchroniser des variables sur le groupe group.
Ref< IDynamicMachineMemoryWindowBaseInternal > createDynamicMachineMemoryWindowBase(Int64 sizeof_segment, Int32 sizeof_type) override
Méthode permettant de créer une fenêtre mémoire dynamique sur le noeud.
Ref< IMachineMemoryWindowBaseInternal > createMachineMemoryWindowBase(Int64 sizeof_segment, Int32 sizeof_type) override
Méthode permettant de créer une fenêtre mémoire sur le noeud.
Spécialisation de MpiRequestList pour MpiParallelMng.
void _wait(Parallel::eWaitType wait_type) override
Effectue l'attente ou le test.
Gestionnaire du parallélisme utilisant MPI.
IParallelMng * worldParallelMng() const override
Gestionnaire de parallélisme sur l'ensemble des ressources allouées.
MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
void barrier() override
Effectue une barière.
UniqueArray< Integer > waitSomeRequests(ArrayView< Request > requests) override
Bloque en attendant qu'une des requêtes rvalues soit terminée.
void waitAllRequests(ArrayView< Request > requests) override
Bloque en attendant que les requêtes rvalues soient terminées.
MessageId probe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
void build() override
Construit l'instance.
void printStats() override
Affiche des statistiques liées à ce gestionnaire du parallélisme.
bool m_is_initialized
true si déjà initialisé
IParallelMng * sequentialParallelMng() override
Retourne un gestionnaire de parallélisme séquentiel.
IThreadMng * threadMng() const override
Gestionnaire de threads.
ITimerMng * timerMng() const override
Gestionnaire de timers.
void initialize() override
Initialise le gestionnaire du parallélisme.
IVariableSynchronizer * createSynchronizer(IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
ISerializeMessage * createSendSerializer(Int32 rank) override
Créé un message non bloquant pour envoyer des données sérialisées au rang rank.
bool isParallel() const override
Retourne true si l'exécution est parallèle.
Ref< IParallelMngUtilsFactory > _internalUtilsFactory() const override
Fabrique des fonctions utilitaires.
ITraceMng * traceMng() const override
Gestionnaire de traces.
IParallelTopology * createTopology() override
Créé une instance contenant les infos sur la topologie des rangs de ce gestionnnaire.
Communicator communicator() const override
Communicateur MPI associé à ce gestionnaire.
IParallelExchanger * createExchanger() override
Retourne une interface pour transférer des messages entre processeurs.
IParallelReplication * replication() const override
Informations sur la réplication.
void setReplication(IParallelReplication *v) override
Positionne les Informations sur la réplication.
Ref< Parallel::IRequestList > createRequestListRef() override
Créé une liste de requêtes pour ce gestionnaire.
ITransferValuesParallelOperation * createTransferValuesOperation() override
Retourne une opération pour transférer des valeurs entre sous-domaine.
UniqueArray< Integer > testSomeRequests(ArrayView< Request > requests) override
Test si une des requêtes rvalues est terminée.
IGetVariablesValuesParallelOperation * createGetVariablesValuesOperation() override
Retourne une opération pour récupérer les valeurs d'une variable sur les entités d'un autre sous-doma...
ISerializeMessage * createReceiveSerializer(Int32 rank) override
Créé un message non bloquant pour recevoir des données sérialisées du rang rank.
void freeRequests(ArrayView< Parallel::Request > requests) override
Libère les requêtes.
Gestionnaire du parallélisme utilisant MPI.
Gestionnaire de timer utisant la bibliothèque MPI.
Definition MpiTimerMng.h:39
void compute() override
Recalcule les infos de synchronisation.
Redirige la gestion des messages des sous-domaines suivant le type de l'argument.
IMessagePassingMng * messagePassingMng() const override
Gestionnaire de message de Arccore associé
ITimeMetricCollector * timeMetricCollector() const override
Collecteur Arccore des statistiques temporelles (peut être nul)
ITimeStats * timeStats() const override
Gestionnaire de statistiques associé (peut être nul)
Classe de base d'une fabrique pour les fonctions utilitaires de IParallelMng.
Informations sur la réplication des sous-domaines en parallèle.
Référence à une instance.
InstanceType * get() const
Instance associée ou nullptr si aucune.
Chaîne de caractères unicode.
bool null() const
Retourne true si la chaîne est nulle.
Definition String.cc:305
Positionne la phase de l'action en cours d'exécution.
Definition Timer.h:128
Positionne une classe de message.
Vecteur 1D de données avec sémantique par valeur (style STL).
MPI_Comm communicator() const override
Récupère le communicateur spécifique de la topologie.
void compute(VariableSynchronizer *var_syncer) override
Calcul le communicateur spécifique.
Interface d'un service de synchronisation de variable.
void compute() override
Création de la liste des éléments de synchronisation.
Int32ConstArrayView communicatingRanks() override
Rangs des sous-domaines avec lesquels on communique.
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
ARCCORE_BASE_EXPORT String getEnvironmentVariable(const String &name)
Variable d'environnement du nom name.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
Ref< TrueType > createRef(Args &&... args)
Créé une instance de type TrueType avec les arguments Args et retourne une référence dessus.
ARCANE_MPI_EXPORT bool arcaneIsAcceleratorAwareMPI()
Indique si le runtime actuel de MPI a le support des accélérateurs.
Definition ArcaneMpi.cc:85
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
Array< Byte > ByteArray
Tableau dynamique à une dimension de caractères.
Definition UtilsTypes.h:121
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:482
@ IK_Cell
Entité de maillage de genre maille.
void arcaneCallFunctionAndTerminateIfThrow(std::function< void()> function)
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int32_t Int32
Type entier signé sur 32 bits.
Infos pour construire un MpiParallelMng.
Infos pour construire un SequentialParallelMng.