Arcane  v4.1.1.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
MpiParallelMng.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* MpiParallelMng.cc (C) 2000-2025 */
9/* */
10/* Gestionnaire de parallélisme utilisant MPI. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/utils/Collection.h"
15#include "arcane/utils/Enumerator.h"
16#include "arcane/utils/ScopedPtr.h"
17#include "arcane/utils/PlatformUtils.h"
18#include "arcane/utils/TimeoutException.h"
19#include "arcane/utils/NotImplementedException.h"
20#include "arcane/utils/ArgumentException.h"
21#include "arcane/utils/ITraceMng.h"
22#include "arcane/utils/ValueConvert.h"
23#include "arcane/utils/Exception.h"
24#include "arcane/utils/HPReal.h"
25
26#include "arcane/core/IIOMng.h"
27#include "arcane/core/Timer.h"
28#include "arcane/core/IItemFamily.h"
29#include "arcane/core/parallel/IStat.h"
30#include "arcane/core/internal/SerializeMessage.h"
31#include "arcane/core/internal/ParallelMngInternal.h"
32
33#include "arcane/parallel/mpi/MpiParallelMng.h"
34#include "arcane/parallel/mpi/MpiParallelDispatch.h"
35#include "arcane/parallel/mpi/MpiTimerMng.h"
36#include "arcane/parallel/mpi/MpiSerializeMessage.h"
37#include "arcane/parallel/mpi/MpiParallelNonBlockingCollective.h"
38#include "arcane/parallel/mpi/MpiDatatype.h"
39#include "arcane/parallel/mpi/IVariableSynchronizerMpiCommunicator.h"
40
41#include "arcane/impl/ParallelReplication.h"
42#include "arcane/impl/SequentialParallelMng.h"
43#include "arcane/impl/internal/ParallelMngUtilsFactoryBase.h"
44#include "arcane/impl/internal/VariableSynchronizer.h"
45
46#include "arccore/message_passing_mpi/MpiMessagePassingMng.h"
47#include "arccore/message_passing_mpi/internal/MpiSerializeDispatcher.h"
48#include "arccore/message_passing_mpi/internal/MpiRequestList.h"
49#include "arccore/message_passing_mpi/internal/MpiAdapter.h"
50#include "arccore/message_passing_mpi/internal/MpiLock.h"
51#include "arccore/message_passing_mpi/internal/MpiMachineMemoryWindowBaseInternalCreator.h"
52#include "arccore/message_passing_mpi/internal/MpiMachineMemoryWindowBaseInternal.h"
53#include "arccore/message_passing_mpi/internal/MpiDynamicMachineMemoryWindowBaseInternal.h"
54#include "arccore/message_passing/Dispatchers.h"
56#include "arccore/message_passing/SerializeMessageList.h"
57
58//#define ARCANE_TRACE_MPI
59
60/*---------------------------------------------------------------------------*/
61/*---------------------------------------------------------------------------*/
62
63namespace Arcane
64{
65using namespace Arcane::MessagePassing;
66using namespace Arcane::MessagePassing::Mpi;
68
69/*---------------------------------------------------------------------------*/
70/*---------------------------------------------------------------------------*/
71
72extern "C++" IIOMng*
73arcaneCreateIOMng(IParallelMng* psm);
74
75#if defined(ARCANE_HAS_MPI_NEIGHBOR)
76// Défini dans MpiNeighborVariableSynchronizeDispatcher
78arcaneCreateMpiNeighborVariableSynchronizerFactory(MpiParallelMng* mpi_pm,
79 Ref<IVariableSynchronizerMpiCommunicator> synchronizer_communicator);
80#endif
82arcaneCreateMpiBlockVariableSynchronizerFactory(MpiParallelMng* mpi_pm, Int32 block_size, Int32 nb_sequence);
84arcaneCreateMpiVariableSynchronizerFactory(MpiParallelMng* mpi_pm);
86arcaneCreateMpiDirectSendrecvVariableSynchronizerFactory(MpiParallelMng* mpi_pm);
88arcaneCreateMpiLegacyVariableSynchronizerFactory(MpiParallelMng* mpi_pm);
89
90/*---------------------------------------------------------------------------*/
91/*---------------------------------------------------------------------------*/
92
93MpiParallelMngBuildInfo::
94MpiParallelMngBuildInfo(MPI_Comm comm, MPI_Comm machine_comm)
95: is_parallel(false)
96, comm_rank(MessagePassing::A_NULL_RANK)
97, comm_nb_rank(0)
98, stat(nullptr)
99, trace_mng(nullptr)
100, timer_mng(nullptr)
101, thread_mng(nullptr)
102, mpi_comm(comm)
103, mpi_machine_comm(machine_comm)
104, is_mpi_comm_owned(true)
105, mpi_lock(nullptr)
106{
107 ::MPI_Comm_rank(comm,&comm_rank);
108 ::MPI_Comm_size(comm,&comm_nb_rank);
109
110 m_dispatchers_ref = createRef<MP::Dispatchers>();
111 MP::Mpi::MpiMessagePassingMng::BuildInfo bi(comm_rank,comm_nb_rank,m_dispatchers_ref.get(),mpi_comm);
112
113 m_message_passing_mng_ref = createRef<MP::Mpi::MpiMessagePassingMng>(bi);
114}
115
116/*---------------------------------------------------------------------------*/
117/*---------------------------------------------------------------------------*/
118
119/*---------------------------------------------------------------------------*/
120/*---------------------------------------------------------------------------*/
124class VariableSynchronizerMpiCommunicator
126{
127 public:
128 explicit VariableSynchronizerMpiCommunicator(MpiParallelMng* pm)
129 : m_mpi_parallel_mng(pm){}
130 ~VariableSynchronizerMpiCommunicator() override
131 {
132 _checkFreeCommunicator();
133 }
134 MPI_Comm communicator() const override
135 {
136 return m_topology_communicator;
137 }
138 void compute(VariableSynchronizer* var_syncer) override
139 {
140 Int32ConstArrayView comm_ranks = var_syncer->communicatingRanks();
141 const Int32 nb_message = comm_ranks.size();
142
143 MpiParallelMng* pm = m_mpi_parallel_mng;
144
145 MPI_Comm old_comm = pm->communicator();
146
147 UniqueArray<int> destinations(nb_message);
148 for( Integer i=0; i<nb_message; ++i ){
149 destinations[i] = comm_ranks[i];
150 }
151
152 _checkFreeCommunicator();
153
154 int r = MPI_Dist_graph_create_adjacent(old_comm, nb_message, destinations.data(), MPI_UNWEIGHTED,
155 nb_message, destinations.data(), MPI_UNWEIGHTED,
156 MPI_INFO_NULL, 0, &m_topology_communicator);
157
158 if (r!=MPI_SUCCESS)
159 ARCANE_FATAL("Error '{0}' in MPI_Dist_graph_create",r);
160
161 // Vérifie que l'ordre des rangs pour l'implémentation MPI est le même que celui qu'on a dans
162 // le VariableSynchronizer.
163 {
164 int indegree = 0;
165 int outdegree = 0;
166 int weighted = 0;
167 MPI_Dist_graph_neighbors_count(m_topology_communicator,&indegree,&outdegree,&weighted);
168
169 if (indegree!=nb_message)
170 ARCANE_FATAL("Bad value '{0}' for 'indegree' (expected={1})",indegree,nb_message);
171 if (outdegree!=nb_message)
172 ARCANE_FATAL("Bad value '{0}' for 'outdegree' (expected={1})",outdegree,nb_message);
173
174 UniqueArray<int> srcs(indegree);
175 UniqueArray<int> dsts(outdegree);
176
177 MPI_Dist_graph_neighbors(m_topology_communicator,indegree,srcs.data(),MPI_UNWEIGHTED,outdegree,dsts.data(),MPI_UNWEIGHTED);
178
179 for(int k=0; k<outdegree; ++k){
180 int x = dsts[k];
181 if (x!=comm_ranks[k])
182 ARCANE_FATAL("Invalid destination rank order k={0} v={1} expected={2}",k,x,comm_ranks[k]);
183 }
184
185 for(int k=0; k<indegree; ++k ){
186 int x = srcs[k];
187 if (x!=comm_ranks[k])
188 ARCANE_FATAL("Invalid source rank order k={0} v={1} expected={2}",k,x,comm_ranks[k]);
189 }
190 }
191 }
192
193 private:
194
195 MpiParallelMng* m_mpi_parallel_mng = nullptr;
196 MPI_Comm m_topology_communicator = MPI_COMM_NULL;
197
198 private:
199
200 void _checkFreeCommunicator()
201 {
202 if (m_topology_communicator!=MPI_COMM_NULL)
203 MPI_Comm_free(&m_topology_communicator);
204 m_topology_communicator = MPI_COMM_NULL;
205 }
206};
207
208/*---------------------------------------------------------------------------*/
209/*---------------------------------------------------------------------------*/
216class MpiVariableSynchronizer
217: public VariableSynchronizer
218{
219 public:
220 MpiVariableSynchronizer(IParallelMng* pm,const ItemGroup& group,
221 Ref<IDataSynchronizeImplementationFactory> implementation_factory,
223 : VariableSynchronizer(pm,group,implementation_factory)
224 , m_topology_info(topology_info)
225 {
226 }
227 public:
228 void compute() override
229 {
231 // Si non nul, calcule la topologie
232 if (m_topology_info.get())
233 m_topology_info->compute(this);
234 }
235 private:
237};
238
239/*---------------------------------------------------------------------------*/
240/*---------------------------------------------------------------------------*/
241
242class MpiParallelMngUtilsFactory
244{
245 public:
246 MpiParallelMngUtilsFactory()
247 : m_synchronizer_version(2)
248 {
249 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="1")
250 m_synchronizer_version = 1;
251 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="2")
252 m_synchronizer_version = 2;
253 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="3")
254 m_synchronizer_version = 3;
255 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="4"){
256 m_synchronizer_version = 4;
257 String v = platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_BLOCK_SIZE");
258 if (!v.null()){
259 Int32 block_size = 0;
260 if (!builtInGetValue(block_size,v))
261 m_synchronize_block_size = block_size;
262 m_synchronize_block_size = std::clamp(m_synchronize_block_size,0,1000000000);
263 }
264 v = platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_NB_SEQUENCE");
265 if (!v.null()){
266 Int32 nb_sequence = 0;
267 if (!builtInGetValue(nb_sequence,v))
268 m_synchronize_nb_sequence = nb_sequence;
269 m_synchronize_nb_sequence = std::clamp(m_synchronize_nb_sequence,1,1024*1024);
270 }
271 }
272 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="5")
273 m_synchronizer_version = 5;
274 }
275 public:
276
278 {
279 return _createSynchronizer(pm,family->allItems());
280 }
281
283 {
284 return _createSynchronizer(pm,group);
285 }
286
287 private:
288
289 Ref<IVariableSynchronizer> _createSynchronizer(IParallelMng* pm,const ItemGroup& group)
290 {
292 MpiParallelMng* mpi_pm = ARCANE_CHECK_POINTER(dynamic_cast<MpiParallelMng*>(pm));
293 ITraceMng* tm = pm->traceMng();
295 // N'affiche les informations que pour le groupe de toutes les mailles pour éviter d'afficher
296 // plusieurs fois le même message.
297 bool do_print = (group.isAllItems() && group.itemKind()==IK_Cell);
298 if (m_synchronizer_version == 2){
299 if (do_print)
300 tm->info() << "Using MpiSynchronizer V2";
301 generic_factory = arcaneCreateMpiVariableSynchronizerFactory(mpi_pm);
302 }
303 else if (m_synchronizer_version == 3 ){
304 if (do_print)
305 tm->info() << "Using MpiSynchronizer V3";
306 generic_factory = arcaneCreateMpiDirectSendrecvVariableSynchronizerFactory(mpi_pm);
307 }
308 else if (m_synchronizer_version == 4){
309 if (do_print)
310 tm->info() << "Using MpiSynchronizer V4 block_size=" << m_synchronize_block_size
311 << " nb_sequence=" << m_synchronize_nb_sequence;
312 generic_factory = arcaneCreateMpiBlockVariableSynchronizerFactory(mpi_pm,m_synchronize_block_size,m_synchronize_nb_sequence);
313 }
314 else if (m_synchronizer_version == 5){
315 if (do_print)
316 tm->info() << "Using MpiSynchronizer V5";
317 topology_info = createRef<VariableSynchronizerMpiCommunicator>(mpi_pm);
318#if defined(ARCANE_HAS_MPI_NEIGHBOR)
319 generic_factory = arcaneCreateMpiNeighborVariableSynchronizerFactory(mpi_pm,topology_info);
320#else
321 throw NotSupportedException(A_FUNCINFO,"Synchronize implementation V5 is not supported with this version of MPI");
322#endif
323 }
324 else{
325 if (do_print)
326 tm->info() << "Using MpiSynchronizer V1";
327 generic_factory = arcaneCreateMpiLegacyVariableSynchronizerFactory(mpi_pm);
328 }
329 if (!generic_factory.get())
330 ARCANE_FATAL("No factory created");
331 return createRef<MpiVariableSynchronizer>(pm,group,generic_factory,topology_info);
332 }
333
334 private:
335
336 Integer m_synchronizer_version = 1;
337 Int32 m_synchronize_block_size = 32000;
338 Int32 m_synchronize_nb_sequence = 1;
339};
340
341/*---------------------------------------------------------------------------*/
342/*---------------------------------------------------------------------------*/
343
344/*---------------------------------------------------------------------------*/
345/*---------------------------------------------------------------------------*/
346
348: public ParallelMngInternal
349{
350 public:
351
352 explicit Impl(MpiParallelMng* pm)
353 : ParallelMngInternal(pm)
354 , m_parallel_mng(pm)
355 {}
356
357 ~Impl() override = default;
358
359 public:
360
362 {
363 return makeRef(m_parallel_mng->adapter()->windowCreator(m_parallel_mng->machineCommunicator())->createWindow(sizeof_segment, sizeof_type));
364 }
365
367 {
368 return makeRef(m_parallel_mng->adapter()->windowCreator(m_parallel_mng->machineCommunicator())->createDynamicWindow(sizeof_segment, sizeof_type));
369 }
370
371 private:
372
373 MpiParallelMng* m_parallel_mng;
374};
375
376/*---------------------------------------------------------------------------*/
377/*---------------------------------------------------------------------------*/
378
379/*---------------------------------------------------------------------------*/
380/*---------------------------------------------------------------------------*/
381
382MpiParallelMng::
383MpiParallelMng(const MpiParallelMngBuildInfo& bi)
384: ParallelMngDispatcher(ParallelMngDispatcherBuildInfo(bi.dispatchersRef(),bi.messagePassingMngRef()))
385, m_trace(bi.trace_mng)
386, m_thread_mng(bi.thread_mng)
387, m_world_parallel_mng(bi.world_parallel_mng)
388, m_timer_mng(bi.timer_mng)
389, m_replication(new ParallelReplication())
390, m_is_parallel(bi.is_parallel)
391, m_comm_rank(bi.commRank())
392, m_comm_size(bi.commSize())
393, m_stat(bi.stat)
394, m_communicator(bi.mpiComm())
395, m_machine_communicator(bi.mpiMachineComm())
396, m_is_communicator_owned(bi.is_mpi_comm_owned)
397, m_mpi_lock(bi.mpi_lock)
398, m_non_blocking_collective(nullptr)
399, m_utils_factory(createRef<MpiParallelMngUtilsFactory>())
400, m_parallel_mng_internal(new Impl(this))
401{
402 if (!m_world_parallel_mng){
403 m_trace->debug()<<"[MpiParallelMng] No m_world_parallel_mng found, reverting to ourselves!";
404 m_world_parallel_mng = this;
405 }
406}
407
408/*---------------------------------------------------------------------------*/
409/*---------------------------------------------------------------------------*/
410
411MpiParallelMng::
412~MpiParallelMng()
413{
414 delete m_parallel_mng_internal;
415 delete m_non_blocking_collective;
416 m_sequential_parallel_mng.reset();
417 if (m_is_communicator_owned){
418 MpiLock::Section ls(m_mpi_lock);
419 MPI_Comm_free(&m_communicator);
420 MPI_Comm_free(&m_machine_communicator);
421 }
422 delete m_replication;
423 delete m_io_mng;
424 if (m_is_timer_owned)
425 delete m_timer_mng;
426 arcaneCallFunctionAndTerminateIfThrow([&]() { m_adapter->destroy(); });
427 delete m_datatype_list;
428}
429
430/*---------------------------------------------------------------------------*/
431/*---------------------------------------------------------------------------*/
432
433namespace
434{
435
436/*---------------------------------------------------------------------------*/
437/*---------------------------------------------------------------------------*/
438// Classe pour créer les différents dispatchers
439class DispatchCreator
440{
441 public:
442 DispatchCreator(ITraceMng* tm,IMessagePassingMng* mpm,MpiAdapter* adapter,MpiDatatypeList* datatype_list)
443 : m_tm(tm), m_mpm(mpm), m_adapter(adapter), m_datatype_list(datatype_list){}
444 public:
445 template<typename DataType> MpiParallelDispatchT<DataType>*
446 create()
447 {
448 MpiDatatype* dt = m_datatype_list->datatype(DataType());
449 return new MpiParallelDispatchT<DataType>(m_tm,m_mpm,m_adapter,dt);
450 }
451
452 ITraceMng* m_tm;
453 IMessagePassingMng* m_mpm;
454 MpiAdapter* m_adapter;
455 MpiDatatypeList* m_datatype_list;
456};
457
458/*---------------------------------------------------------------------------*/
459/*---------------------------------------------------------------------------*/
460
461class ControlDispatcherDecorator
462: public ParallelMngDispatcher::DefaultControlDispatcher
463{
464 public:
465
466 ControlDispatcherDecorator(IParallelMng* pm, MpiAdapter* adapter)
467 : ParallelMngDispatcher::DefaultControlDispatcher(pm), m_adapter(adapter) {}
468
469 IMessagePassingMng* commSplit(bool keep) override
470 {
471 return m_adapter->commSplit(keep);
472 }
473 MP::IProfiler* profiler() const override { return m_adapter->profiler(); }
474 void setProfiler(MP::IProfiler* p) override { m_adapter->setProfiler(p); }
475
476 private:
477 MpiAdapter* m_adapter;
478};
479}
480
481/*---------------------------------------------------------------------------*/
482/*---------------------------------------------------------------------------*/
483
485build()
486{
487 ITraceMng* tm = traceMng();
488 if (!m_timer_mng){
489 m_timer_mng = new MpiTimerMng(tm);
490 m_is_timer_owned = true;
491 }
492
493 // Créé le gestionnaire séquentiel associé.
494 {
496 bi.setTraceMng(traceMng());
497 bi.setCommunicator(communicator());
498 bi.setThreadMng(threadMng());
499 m_sequential_parallel_mng = arcaneCreateSequentialParallelMngRef(bi);
500 }
501
502 // Indique que les reduces doivent être fait dans l'ordre des processeurs
503 // afin de garantir une exécution déterministe
504 bool is_ordered_reduce = false;
505 if (platform::getEnvironmentVariable("ARCANE_ORDERED_REDUCE")=="TRUE")
506 is_ordered_reduce = true;
507 m_datatype_list = new MpiDatatypeList(is_ordered_reduce);
508
509 ARCANE_CHECK_POINTER(m_stat);
510
511 MpiAdapter* adapter = new MpiAdapter(m_trace,m_stat->toArccoreStat(),m_communicator,m_mpi_lock);
512 m_adapter = adapter;
513 auto mpm = _messagePassingMng();
514
515 // NOTE: cette instance sera détruite par le ParallelMngDispatcher
516 auto* control_dispatcher = new ControlDispatcherDecorator(this,m_adapter);
517 _setControlDispatcher(control_dispatcher);
518
519 // NOTE: cette instance sera détruite par le ParallelMngDispatcher
520 auto* serialize_dispatcher = new MpiSerializeDispatcher(m_adapter, mpm);
521 m_mpi_serialize_dispatcher = serialize_dispatcher;
522 _setSerializeDispatcher(serialize_dispatcher);
523
524 DispatchCreator creator(m_trace,mpm,m_adapter,m_datatype_list);
525 this->createDispatchers(creator);
526
527 m_io_mng = arcaneCreateIOMng(this);
528
529 m_non_blocking_collective = new MpiParallelNonBlockingCollective(tm,this,adapter);
530 m_non_blocking_collective->build();
531 if (m_mpi_lock)
532 m_trace->info() << "Using mpi with locks.";
533}
534
535/*---------------------------------------------------------------------------*/
536/*---------------------------------------------------------------------------*/
537
538/*----------------------------------------------------------------------------*/
539/*---------------------------------------------------------------------------*/
540
543{
544 Trace::Setter mci(m_trace,"Mpi");
545 if (m_is_initialized){
546 m_trace->warning() << "MpiParallelMng already initialized";
547 return;
548 }
549
550 m_trace->info() << "Initialisation de MpiParallelMng";
551 m_sequential_parallel_mng->initialize();
552
553 m_adapter->setTimeMetricCollector(timeMetricCollector());
554
555 m_is_initialized = true;
556}
557
558/*---------------------------------------------------------------------------*/
559/*---------------------------------------------------------------------------*/
560
561void MpiParallelMng::
562sendSerializer(ISerializer* s,Int32 rank)
563{
564 Trace::Setter mci(m_trace,"Mpi");
565 Timer::Phase tphase(timeStats(),TP_Communication);
567 m_mpi_serialize_dispatcher->legacySendSerializer(s,{MessageRank(rank),mpi_tag,Blocking});
568}
569
570/*---------------------------------------------------------------------------*/
571/*---------------------------------------------------------------------------*/
572
575{
576 return m_utils_factory->createSendSerializeMessage(this, rank)._release();
577}
578
579/*---------------------------------------------------------------------------*/
580/*---------------------------------------------------------------------------*/
581
582Request MpiParallelMng::
583sendSerializer(ISerializer* s,Int32 rank,[[maybe_unused]] ByteArray& bytes)
584{
585 Trace::Setter mci(m_trace,"Mpi");
586 Timer::Phase tphase(timeStats(),TP_Communication);
588 return m_mpi_serialize_dispatcher->legacySendSerializer(s,{MessageRank(rank),mpi_tag,NonBlocking});
589}
590
591/*---------------------------------------------------------------------------*/
592/*---------------------------------------------------------------------------*/
593
594void MpiParallelMng::
595broadcastSerializer(ISerializer* values,Int32 rank)
596{
597 Timer::Phase tphase(timeStats(),TP_Communication);
598 m_mpi_serialize_dispatcher->broadcastSerializer(values,MessageRank(rank));
599}
600
601/*---------------------------------------------------------------------------*/
602/*---------------------------------------------------------------------------*/
603
604void MpiParallelMng::
605recvSerializer(ISerializer* values,Int32 rank)
606{
607 Trace::Setter mci(m_trace,"Mpi");
608 Timer::Phase tphase(timeStats(),TP_Communication);
610 m_mpi_serialize_dispatcher->legacyReceiveSerializer(values,MessageRank(rank),mpi_tag);
611}
612
613/*---------------------------------------------------------------------------*/
614/*---------------------------------------------------------------------------*/
615
618{
619 return m_utils_factory->createReceiveSerializeMessage(this, rank)._release();
620}
621
622/*---------------------------------------------------------------------------*/
623/*---------------------------------------------------------------------------*/
624
626probe(const PointToPointMessageInfo& message) -> MessageId
627{
628 return m_adapter->probeMessage(message);
629}
630
631/*---------------------------------------------------------------------------*/
632/*---------------------------------------------------------------------------*/
633
636{
637 return m_adapter->legacyProbeMessage(message);
638}
639
640/*---------------------------------------------------------------------------*/
641/*---------------------------------------------------------------------------*/
642
643Request MpiParallelMng::
644sendSerializer(const ISerializer* s,const PointToPointMessageInfo& message)
645{
646 return m_mpi_serialize_dispatcher->sendSerializer(s,message);
647}
648
649/*---------------------------------------------------------------------------*/
650/*---------------------------------------------------------------------------*/
651
652Request MpiParallelMng::
653receiveSerializer(ISerializer* s,const PointToPointMessageInfo& message)
654{
655 return m_mpi_serialize_dispatcher->receiveSerializer(s,message);
656}
657
658/*---------------------------------------------------------------------------*/
659/*---------------------------------------------------------------------------*/
660
663{
664 for( Integer i=0, is=requests.size(); i<is; ++i )
665 m_adapter->freeRequest(requests[i]);
666}
667
668/*---------------------------------------------------------------------------*/
669/*---------------------------------------------------------------------------*/
670
671void MpiParallelMng::
672_checkFinishedSubRequests()
673{
674 m_mpi_serialize_dispatcher->checkFinishedSubRequests();
675}
676
677/*---------------------------------------------------------------------------*/
678/*---------------------------------------------------------------------------*/
679
680Ref<IParallelMng> MpiParallelMng::
681sequentialParallelMngRef()
682{
683 return m_sequential_parallel_mng;
684}
685
688{
689 return m_sequential_parallel_mng.get();
690}
691
692/*---------------------------------------------------------------------------*/
693/*---------------------------------------------------------------------------*/
694
697{
698 if (m_stat)
699 m_stat->print(m_trace);
700}
701
702/*---------------------------------------------------------------------------*/
703/*---------------------------------------------------------------------------*/
704
706barrier()
707{
708 traceMng()->flush();
709 m_adapter->barrier();
710}
711
712/*---------------------------------------------------------------------------*/
713/*---------------------------------------------------------------------------*/
714
717{
718 m_adapter->waitAllRequests(requests);
719 _checkFinishedSubRequests();
720}
721
722/*---------------------------------------------------------------------------*/
723/*---------------------------------------------------------------------------*/
724
727{
728 return _waitSomeRequests(requests, false);
729}
730
731/*---------------------------------------------------------------------------*/
732/*---------------------------------------------------------------------------*/
733
736{
737 return _waitSomeRequests(requests, true);
738}
739
740/*---------------------------------------------------------------------------*/
741/*---------------------------------------------------------------------------*/
742
743UniqueArray<Integer> MpiParallelMng::
744_waitSomeRequests(ArrayView<Request> requests, bool is_non_blocking)
745{
746 UniqueArray<Integer> results;
747 UniqueArray<bool> done_indexes(requests.size());
748
749 m_adapter->waitSomeRequests(requests, done_indexes, is_non_blocking);
750 for (int i = 0 ; i < requests.size() ; i++) {
751 if (done_indexes[i])
752 results.add(i);
753 }
754 return results;
755}
756
757/*---------------------------------------------------------------------------*/
758/*---------------------------------------------------------------------------*/
759
760ISerializeMessageList* MpiParallelMng::
761_createSerializeMessageList()
762{
763 return new MP::internal::SerializeMessageList(messagePassingMng());
764}
765
766/*---------------------------------------------------------------------------*/
767/*---------------------------------------------------------------------------*/
768
771{
772 return m_utils_factory->createGetVariablesValuesOperation(this)._release();
773}
774
775/*---------------------------------------------------------------------------*/
776/*---------------------------------------------------------------------------*/
777
780{
781 return m_utils_factory->createTransferValuesOperation(this)._release();
782}
783
784/*---------------------------------------------------------------------------*/
785/*---------------------------------------------------------------------------*/
786
789{
790 return m_utils_factory->createExchanger(this)._release();
791}
792
793/*---------------------------------------------------------------------------*/
794/*---------------------------------------------------------------------------*/
795
798{
799 return m_utils_factory->createSynchronizer(this,family)._release();
800}
801
802/*---------------------------------------------------------------------------*/
803/*---------------------------------------------------------------------------*/
804
806createSynchronizer(const ItemGroup& group)
807{
808 return m_utils_factory->createSynchronizer(this,group)._release();
809}
810
811/*---------------------------------------------------------------------------*/
812/*---------------------------------------------------------------------------*/
813
816{
817 return m_utils_factory->createTopology(this)._release();
818}
819
820/*---------------------------------------------------------------------------*/
821/*---------------------------------------------------------------------------*/
822
824replication() const
825{
826 return m_replication;
827}
828
829/*---------------------------------------------------------------------------*/
830/*---------------------------------------------------------------------------*/
831
834{
835 delete m_replication;
836 m_replication = v;
837}
838
839/*---------------------------------------------------------------------------*/
840/*---------------------------------------------------------------------------*/
841
842IParallelMng* MpiParallelMng::
843_createSubParallelMng(MPI_Comm sub_communicator)
844{
845 // Si nul, ce rang ne fait pas partie du sous-communicateur
846 if (sub_communicator==MPI_COMM_NULL)
847 return nullptr;
848
849 int sub_rank = -1;
850 MPI_Comm_rank(sub_communicator,&sub_rank);
851
852 MPI_Comm sub_machine_communicator = MPI_COMM_NULL;
853 MPI_Comm_split_type(sub_communicator, MPI_COMM_TYPE_SHARED, sub_rank, MPI_INFO_NULL, &sub_machine_communicator);
854
855 MpiParallelMngBuildInfo bi(sub_communicator, sub_machine_communicator);
856 bi.is_parallel = isParallel();
857 bi.stat = m_stat;
858 bi.timer_mng = m_timer_mng;
859 bi.thread_mng = m_thread_mng;
860 bi.trace_mng = m_trace;
861 bi.world_parallel_mng = m_world_parallel_mng;
862 bi.mpi_lock = m_mpi_lock;
863
864 IParallelMng* sub_pm = new MpiParallelMng(bi);
865 sub_pm->build();
866 return sub_pm;
867}
868
869/*---------------------------------------------------------------------------*/
870/*---------------------------------------------------------------------------*/
871
872Ref<IParallelMng> MpiParallelMng::
873_createSubParallelMngRef(Int32 color, Int32 key)
874{
875 if (color < 0)
876 color = MPI_UNDEFINED;
877 MPI_Comm sub_communicator = MPI_COMM_NULL;
878 MPI_Comm_split(m_communicator, color, key, &sub_communicator);
879 IParallelMng* sub_pm = _createSubParallelMng(sub_communicator);
880 return makeRef(sub_pm);
881}
882
883/*---------------------------------------------------------------------------*/
884/*---------------------------------------------------------------------------*/
885
886IParallelMng* MpiParallelMng::
887_createSubParallelMng(Int32ConstArrayView kept_ranks)
888{
889 MPI_Group mpi_group = MPI_GROUP_NULL;
890 MPI_Comm_group(m_communicator, &mpi_group);
891 Integer nb_sub_rank = kept_ranks.size();
892 UniqueArray<int> mpi_kept_ranks(nb_sub_rank);
893 for (Integer i = 0; i < nb_sub_rank; ++i)
894 mpi_kept_ranks[i] = (int)kept_ranks[i];
895
896 MPI_Group final_group = MPI_GROUP_NULL;
897 MPI_Group_incl(mpi_group, nb_sub_rank, mpi_kept_ranks.data(), &final_group);
898 MPI_Comm sub_communicator = MPI_COMM_NULL;
899
900 MPI_Comm_create(m_communicator, final_group, &sub_communicator);
901 MPI_Group_free(&final_group);
902 return _createSubParallelMng(sub_communicator);
903}
904
905/*---------------------------------------------------------------------------*/
906/*---------------------------------------------------------------------------*/
915: public MpiRequestList
916{
917 using Base = MpiRequestList;
918 public:
919 explicit RequestList(MpiParallelMng* pm)
920 : Base(pm->m_adapter), m_parallel_mng(pm){}
921 public:
922 void _wait(Parallel::eWaitType wait_type) override
923 {
924 Base::_wait(wait_type);
925 m_parallel_mng->_checkFinishedSubRequests();
926 };
927 private:
928 MpiParallelMng* m_parallel_mng;
929};
930
931/*---------------------------------------------------------------------------*/
932/*---------------------------------------------------------------------------*/
933
939
940/*---------------------------------------------------------------------------*/
941/*---------------------------------------------------------------------------*/
942
945{
946 return m_utils_factory;
947}
948
949/*---------------------------------------------------------------------------*/
950/*---------------------------------------------------------------------------*/
951
952bool MpiParallelMng::
953_isAcceleratorAware() const
954{
956}
957
958/*---------------------------------------------------------------------------*/
959/*---------------------------------------------------------------------------*/
960
961} // End namespace Arcane
962
963/*---------------------------------------------------------------------------*/
964/*---------------------------------------------------------------------------*/
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Liste des fonctions d'échange de message.
Vue modifiable d'un tableau d'un type T.
constexpr Integer size() const noexcept
Retourne la taille du tableau.
const T * data() const
Accès à la racine du tableau hors toute protection.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
Opérations pour accéder aux valeurs de variables d'un autre sous-domaine.
Interface du gestionnaire des entrées sorties.
Definition IIOMng.h:36
Interface d'une famille d'entités.
Definition IItemFamily.h:84
virtual ItemGroup allItems() const =0
Groupe de toutes les entités.
Échange d'informations entre processeurs.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual ITraceMng * traceMng() const =0
Gestionnaire de traces.
virtual void build()=0
Construit l'instance.
Informations sur la réplication des sous-domaines en parallèle.
Informations sur la topologie d'allocation des coeurs de calcul.
Interface du gestionnaire de traces.
virtual void flush()=0
Flush tous les flots.
virtual TraceMessage info()=0
Flot pour un message d'information.
Envoie de valeurs sur différents processeurs.
Interface d'un communicateur MPI spécifique pour les synchronisations.
Interface d'un service de synchronisation de variable.
Groupe d'entités de maillage.
Definition ItemGroup.h:49
eItemKind itemKind() const
Genre du groupe. Il s'agit du genre de ses éléments.
Definition ItemGroup.h:109
bool isAllItems() const
Indique si le groupe est celui de toutes les entités.
Definition ItemGroup.cc:610
Interface d'un message de sérialisation entre IMessagePassingMng.
Informations sur la source d'un message.
void _wait(eWaitType wait_type) override
Effectue l'attente ou le test.
Request receiveSerializer(ISerializer *s, const PointToPointMessageInfo &message) override
Message de réception.
Request sendSerializer(const ISerializer *s, const PointToPointMessageInfo &message) override
Message d'envoi.
Informations pour envoyer/recevoir un message point à point.
Requête d'un message.
Definition Request.h:77
Message de sérialisation utilisant un BasicSerializer.
static MessageTag defaultTag()
Tag par défaut pour les messages de sérialisation.
Gère les MPI_Datatype associées aux types Arcane.
Ref< IVariableSynchronizer > createSynchronizer(IParallelMng *pm, IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
Ref< IVariableSynchronizer > createSynchronizer(IParallelMng *pm, const ItemGroup &group) override
Retourne une interface pour synchroniser des variables sur le groupe group.
Ref< IDynamicMachineMemoryWindowBaseInternal > createDynamicMachineMemoryWindowBase(Int64 sizeof_segment, Int32 sizeof_type) override
Méthode permettant de créer une fenêtre mémoire dynamique sur le noeud.
Ref< IMachineMemoryWindowBaseInternal > createMachineMemoryWindowBase(Int64 sizeof_segment, Int32 sizeof_type) override
Méthode permettant de créer une fenêtre mémoire sur le noeud.
Spécialisation de MpiRequestList pour MpiParallelMng.
void _wait(Parallel::eWaitType wait_type) override
Effectue l'attente ou le test.
Gestionnaire du parallélisme utilisant MPI.
IParallelMng * worldParallelMng() const override
Gestionnaire de parallélisme sur l'ensemble des ressources allouées.
MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
void barrier() override
Effectue une barière.
UniqueArray< Integer > waitSomeRequests(ArrayView< Request > requests) override
Bloque en attendant qu'une des requêtes rvalues soit terminée.
void waitAllRequests(ArrayView< Request > requests) override
Bloque en attendant que les requêtes rvalues soient terminées.
MessageId probe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
void build() override
Construit l'instance.
void printStats() override
Affiche des statistiques liées à ce gestionnaire du parallélisme.
bool m_is_initialized
true si déjà initialisé
IParallelMng * sequentialParallelMng() override
Retourne un gestionnaire de parallélisme séquentiel.
IThreadMng * threadMng() const override
Gestionnaire de threads.
ITimerMng * timerMng() const override
Gestionnaire de timers.
void initialize() override
Initialise le gestionnaire du parallélisme.
IVariableSynchronizer * createSynchronizer(IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
ISerializeMessage * createSendSerializer(Int32 rank) override
Créé un message non bloquant pour envoyer des données sérialisées au rang rank.
bool isParallel() const override
Retourne true si l'exécution est parallèle.
Ref< IParallelMngUtilsFactory > _internalUtilsFactory() const override
Fabrique des fonctions utilitaires.
ITraceMng * traceMng() const override
Gestionnaire de traces.
IParallelTopology * createTopology() override
Créé une instance contenant les infos sur la topologie des rangs de ce gestionnnaire.
Communicator communicator() const override
Communicateur MPI associé à ce gestionnaire.
IParallelExchanger * createExchanger() override
Retourne une interface pour transférer des messages entre processeurs.
IParallelReplication * replication() const override
Informations sur la réplication.
void setReplication(IParallelReplication *v) override
Positionne les Informations sur la réplication.
Ref< Parallel::IRequestList > createRequestListRef() override
Créé une liste de requêtes pour ce gestionnaire.
ITransferValuesParallelOperation * createTransferValuesOperation() override
Retourne une opération pour transférer des valeurs entre sous-domaine.
UniqueArray< Integer > testSomeRequests(ArrayView< Request > requests) override
Test si une des requêtes rvalues est terminée.
IGetVariablesValuesParallelOperation * createGetVariablesValuesOperation() override
Retourne une opération pour récupérer les valeurs d'une variable sur les entités d'un autre sous-doma...
ISerializeMessage * createReceiveSerializer(Int32 rank) override
Créé un message non bloquant pour recevoir des données sérialisées du rang rank.
void freeRequests(ArrayView< Parallel::Request > requests) override
Libère les requêtes.
Gestionnaire du parallélisme utilisant MPI.
Gestionnaire de timer utisant la bibliothèque MPI.
Definition MpiTimerMng.h:39
void compute() override
Recalcule les infos de synchronisation.
Redirige la gestion des messages des sous-domaines suivant le type de l'argument.
IMessagePassingMng * messagePassingMng() const override
Gestionnaire de message de Arccore associé
ITimeMetricCollector * timeMetricCollector() const override
Collecteur Arccore des statistiques temporelles (peut être nul)
ITimeStats * timeStats() const override
Gestionnaire de statistiques associé (peut être nul)
Classe de base d'une fabrique pour les fonctions utilitaires de IParallelMng.
Informations sur la réplication des sous-domaines en parallèle.
Référence à une instance.
InstanceType * get() const
Instance associée ou nullptr si aucune.
Chaîne de caractères unicode.
bool null() const
Retourne true si la chaîne est nulle.
Definition String.cc:305
Positionne la phase de l'action en cours d'exécution.
Definition Timer.h:128
Positionne une classe de message.
Vecteur 1D de données avec sémantique par valeur (style STL).
MPI_Comm communicator() const override
Récupère le communicateur spécifique de la topologie.
void compute(VariableSynchronizer *var_syncer) override
Calcul le communicateur spécifique.
Interface d'un service de synchronisation de variable.
void compute() override
Création de la liste des éléments de synchronisation.
Int32ConstArrayView communicatingRanks() override
Rangs des sous-domaines avec lesquels on communique.
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
ARCCORE_BASE_EXPORT String getEnvironmentVariable(const String &name)
Variable d'environnement du nom name.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
Ref< TrueType > createRef(Args &&... args)
Créé une instance de type TrueType avec les arguments Args et retourne une référence dessus.
ARCANE_MPI_EXPORT bool arcaneIsAcceleratorAwareMPI()
Indique si le runtime actuel de MPI a le support des accélérateurs.
Definition ArcaneMpi.cc:82
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
Array< Byte > ByteArray
Tableau dynamique à une dimension de caractères.
Definition UtilsTypes.h:137
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:498
@ IK_Cell
Entité de maillage de genre maille.
void arcaneCallFunctionAndTerminateIfThrow(std::function< void()> function)
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int32_t Int32
Type entier signé sur 32 bits.
Infos pour construire un MpiParallelMng.
Infos pour construire un SequentialParallelMng.