Arcane  v3.16.8.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
MpiParallelMng.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* MpiParallelMng.cc (C) 2000-2025 */
9/* */
10/* Gestionnaire de parallélisme utilisant MPI. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/utils/Collection.h"
15#include "arcane/utils/Enumerator.h"
16#include "arcane/utils/ScopedPtr.h"
17#include "arcane/utils/PlatformUtils.h"
18#include "arcane/utils/TimeoutException.h"
19#include "arcane/utils/NotImplementedException.h"
20#include "arcane/utils/ArgumentException.h"
21#include "arcane/utils/ITraceMng.h"
22#include "arcane/utils/ValueConvert.h"
23#include "arcane/utils/Exception.h"
24#include "arcane/utils/HPReal.h"
25
26#include "arcane/core/IIOMng.h"
27#include "arcane/core/Timer.h"
28#include "arcane/core/IItemFamily.h"
29#include "arcane/core/parallel/IStat.h"
30#include "arcane/core/internal/SerializeMessage.h"
31#include "arcane/core/internal/ParallelMngInternal.h"
32
33#include "arcane/parallel/mpi/MpiParallelMng.h"
34#include "arcane/parallel/mpi/MpiParallelDispatch.h"
35#include "arcane/parallel/mpi/MpiTimerMng.h"
36#include "arcane/parallel/mpi/MpiSerializeMessage.h"
37#include "arcane/parallel/mpi/MpiParallelNonBlockingCollective.h"
38#include "arcane/parallel/mpi/MpiDatatype.h"
39#include "arcane/parallel/mpi/IVariableSynchronizerMpiCommunicator.h"
40
41#include "arcane/impl/ParallelReplication.h"
42#include "arcane/impl/SequentialParallelMng.h"
43#include "arcane/impl/internal/ParallelMngUtilsFactoryBase.h"
44#include "arcane/impl/internal/VariableSynchronizer.h"
45
46#include "arccore/message_passing_mpi/MpiMessagePassingMng.h"
47#include "arccore/message_passing_mpi/internal/MpiSerializeDispatcher.h"
48#include "arccore/message_passing_mpi/internal/MpiRequestList.h"
49#include "arccore/message_passing_mpi/internal/MpiAdapter.h"
50#include "arccore/message_passing_mpi/internal/MpiLock.h"
51#include "arccore/message_passing_mpi/internal/MpiMachineMemoryWindowBaseInternalCreator.h"
52#include "arccore/message_passing_mpi/internal/MpiMachineMemoryWindowBaseInternal.h"
53#include "arccore/message_passing_mpi/internal/MpiDynamicMachineMemoryWindowBaseInternal.h"
54#include "arccore/message_passing/Dispatchers.h"
56#include "arccore/message_passing/SerializeMessageList.h"
57
58//#define ARCANE_TRACE_MPI
59
60/*---------------------------------------------------------------------------*/
61/*---------------------------------------------------------------------------*/
62
63namespace Arcane
64{
65using namespace Arcane::MessagePassing;
66using namespace Arcane::MessagePassing::Mpi;
68
69/*---------------------------------------------------------------------------*/
70/*---------------------------------------------------------------------------*/
71
72extern "C++" IIOMng*
73arcaneCreateIOMng(IParallelMng* psm);
74
75#if defined(ARCANE_HAS_MPI_NEIGHBOR)
76// Défini dans MpiNeighborVariableSynchronizeDispatcher
78arcaneCreateMpiNeighborVariableSynchronizerFactory(MpiParallelMng* mpi_pm,
79 Ref<IVariableSynchronizerMpiCommunicator> synchronizer_communicator);
80#endif
82arcaneCreateMpiBlockVariableSynchronizerFactory(MpiParallelMng* mpi_pm, Int32 block_size, Int32 nb_sequence);
84arcaneCreateMpiVariableSynchronizerFactory(MpiParallelMng* mpi_pm);
86arcaneCreateMpiDirectSendrecvVariableSynchronizerFactory(MpiParallelMng* mpi_pm);
88arcaneCreateMpiLegacyVariableSynchronizerFactory(MpiParallelMng* mpi_pm);
89
90/*---------------------------------------------------------------------------*/
91/*---------------------------------------------------------------------------*/
92
93MpiParallelMngBuildInfo::
94MpiParallelMngBuildInfo(MPI_Comm comm)
95: is_parallel(false)
96, comm_rank(MessagePassing::A_NULL_RANK)
97, comm_nb_rank(0)
98, stat(nullptr)
99, trace_mng(nullptr)
100, timer_mng(nullptr)
101, thread_mng(nullptr)
102, mpi_comm(comm)
103, is_mpi_comm_owned(true)
104, mpi_lock(nullptr)
105{
106 ::MPI_Comm_rank(comm,&comm_rank);
107 ::MPI_Comm_size(comm,&comm_nb_rank);
108
109 m_dispatchers_ref = createRef<MP::Dispatchers>();
110 MP::Mpi::MpiMessagePassingMng::BuildInfo bi(comm_rank,comm_nb_rank,m_dispatchers_ref.get(),mpi_comm);
111
112 m_message_passing_mng_ref = createRef<MP::Mpi::MpiMessagePassingMng>(bi);
113}
114
115/*---------------------------------------------------------------------------*/
116/*---------------------------------------------------------------------------*/
117
118/*---------------------------------------------------------------------------*/
119/*---------------------------------------------------------------------------*/
123class VariableSynchronizerMpiCommunicator
125{
126 public:
127 explicit VariableSynchronizerMpiCommunicator(MpiParallelMng* pm)
128 : m_mpi_parallel_mng(pm){}
129 ~VariableSynchronizerMpiCommunicator() override
130 {
131 _checkFreeCommunicator();
132 }
133 MPI_Comm communicator() const override
134 {
135 return m_topology_communicator;
136 }
137 void compute(VariableSynchronizer* var_syncer) override
138 {
139 Int32ConstArrayView comm_ranks = var_syncer->communicatingRanks();
140 const Int32 nb_message = comm_ranks.size();
141
142 MpiParallelMng* pm = m_mpi_parallel_mng;
143
144 MPI_Comm old_comm = pm->communicator();
145
146 UniqueArray<int> destinations(nb_message);
147 for( Integer i=0; i<nb_message; ++i ){
148 destinations[i] = comm_ranks[i];
149 }
150
151 _checkFreeCommunicator();
152
153 int r = MPI_Dist_graph_create_adjacent(old_comm, nb_message, destinations.data(), MPI_UNWEIGHTED,
154 nb_message, destinations.data(), MPI_UNWEIGHTED,
155 MPI_INFO_NULL, 0, &m_topology_communicator);
156
157 if (r!=MPI_SUCCESS)
158 ARCANE_FATAL("Error '{0}' in MPI_Dist_graph_create",r);
159
160 // Vérifie que l'ordre des rangs pour l'implémentation MPI est le même que celui qu'on a dans
161 // le VariableSynchronizer.
162 {
163 int indegree = 0;
164 int outdegree = 0;
165 int weighted = 0;
166 MPI_Dist_graph_neighbors_count(m_topology_communicator,&indegree,&outdegree,&weighted);
167
168 if (indegree!=nb_message)
169 ARCANE_FATAL("Bad value '{0}' for 'indegree' (expected={1})",indegree,nb_message);
170 if (outdegree!=nb_message)
171 ARCANE_FATAL("Bad value '{0}' for 'outdegree' (expected={1})",outdegree,nb_message);
172
173 UniqueArray<int> srcs(indegree);
174 UniqueArray<int> dsts(outdegree);
175
176 MPI_Dist_graph_neighbors(m_topology_communicator,indegree,srcs.data(),MPI_UNWEIGHTED,outdegree,dsts.data(),MPI_UNWEIGHTED);
177
178 for(int k=0; k<outdegree; ++k){
179 int x = dsts[k];
180 if (x!=comm_ranks[k])
181 ARCANE_FATAL("Invalid destination rank order k={0} v={1} expected={2}",k,x,comm_ranks[k]);
182 }
183
184 for(int k=0; k<indegree; ++k ){
185 int x = srcs[k];
186 if (x!=comm_ranks[k])
187 ARCANE_FATAL("Invalid source rank order k={0} v={1} expected={2}",k,x,comm_ranks[k]);
188 }
189 }
190 }
191
192 private:
193
194 MpiParallelMng* m_mpi_parallel_mng = nullptr;
195 MPI_Comm m_topology_communicator = MPI_COMM_NULL;
196
197 private:
198
199 void _checkFreeCommunicator()
200 {
201 if (m_topology_communicator!=MPI_COMM_NULL)
202 MPI_Comm_free(&m_topology_communicator);
203 m_topology_communicator = MPI_COMM_NULL;
204 }
205};
206
207/*---------------------------------------------------------------------------*/
208/*---------------------------------------------------------------------------*/
215class MpiVariableSynchronizer
216: public VariableSynchronizer
217{
218 public:
219 MpiVariableSynchronizer(IParallelMng* pm,const ItemGroup& group,
220 Ref<IDataSynchronizeImplementationFactory> implementation_factory,
222 : VariableSynchronizer(pm,group,implementation_factory)
223 , m_topology_info(topology_info)
224 {
225 }
226 public:
227 void compute() override
228 {
230 // Si non nul, calcule la topologie
231 if (m_topology_info.get())
232 m_topology_info->compute(this);
233 }
234 private:
236};
237
238/*---------------------------------------------------------------------------*/
239/*---------------------------------------------------------------------------*/
240
241class MpiParallelMngUtilsFactory
243{
244 public:
245 MpiParallelMngUtilsFactory()
246 : m_synchronizer_version(2)
247 {
248 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="1")
249 m_synchronizer_version = 1;
250 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="2")
251 m_synchronizer_version = 2;
252 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="3")
253 m_synchronizer_version = 3;
254 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="4"){
255 m_synchronizer_version = 4;
256 String v = platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_BLOCK_SIZE");
257 if (!v.null()){
258 Int32 block_size = 0;
259 if (!builtInGetValue(block_size,v))
260 m_synchronize_block_size = block_size;
261 m_synchronize_block_size = std::clamp(m_synchronize_block_size,0,1000000000);
262 }
263 v = platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_NB_SEQUENCE");
264 if (!v.null()){
265 Int32 nb_sequence = 0;
266 if (!builtInGetValue(nb_sequence,v))
267 m_synchronize_nb_sequence = nb_sequence;
268 m_synchronize_nb_sequence = std::clamp(m_synchronize_nb_sequence,1,1024*1024);
269 }
270 }
271 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="5")
272 m_synchronizer_version = 5;
273 }
274 public:
275
277 {
278 return _createSynchronizer(pm,family->allItems());
279 }
280
282 {
283 return _createSynchronizer(pm,group);
284 }
285
286 private:
287
288 Ref<IVariableSynchronizer> _createSynchronizer(IParallelMng* pm,const ItemGroup& group)
289 {
291 MpiParallelMng* mpi_pm = ARCANE_CHECK_POINTER(dynamic_cast<MpiParallelMng*>(pm));
292 ITraceMng* tm = pm->traceMng();
294 // N'affiche les informations que pour le groupe de toutes les mailles pour éviter d'afficher
295 // plusieurs fois le même message.
296 bool do_print = (group.isAllItems() && group.itemKind()==IK_Cell);
297 if (m_synchronizer_version == 2){
298 if (do_print)
299 tm->info() << "Using MpiSynchronizer V2";
300 generic_factory = arcaneCreateMpiVariableSynchronizerFactory(mpi_pm);
301 }
302 else if (m_synchronizer_version == 3 ){
303 if (do_print)
304 tm->info() << "Using MpiSynchronizer V3";
305 generic_factory = arcaneCreateMpiDirectSendrecvVariableSynchronizerFactory(mpi_pm);
306 }
307 else if (m_synchronizer_version == 4){
308 if (do_print)
309 tm->info() << "Using MpiSynchronizer V4 block_size=" << m_synchronize_block_size
310 << " nb_sequence=" << m_synchronize_nb_sequence;
311 generic_factory = arcaneCreateMpiBlockVariableSynchronizerFactory(mpi_pm,m_synchronize_block_size,m_synchronize_nb_sequence);
312 }
313 else if (m_synchronizer_version == 5){
314 if (do_print)
315 tm->info() << "Using MpiSynchronizer V5";
316 topology_info = createRef<VariableSynchronizerMpiCommunicator>(mpi_pm);
317#if defined(ARCANE_HAS_MPI_NEIGHBOR)
318 generic_factory = arcaneCreateMpiNeighborVariableSynchronizerFactory(mpi_pm,topology_info);
319#else
320 throw NotSupportedException(A_FUNCINFO,"Synchronize implementation V5 is not supported with this version of MPI");
321#endif
322 }
323 else{
324 if (do_print)
325 tm->info() << "Using MpiSynchronizer V1";
326 generic_factory = arcaneCreateMpiLegacyVariableSynchronizerFactory(mpi_pm);
327 }
328 if (!generic_factory.get())
329 ARCANE_FATAL("No factory created");
330 return createRef<MpiVariableSynchronizer>(pm,group,generic_factory,topology_info);
331 }
332
333 private:
334
335 Integer m_synchronizer_version = 1;
336 Int32 m_synchronize_block_size = 32000;
337 Int32 m_synchronize_nb_sequence = 1;
338};
339
340/*---------------------------------------------------------------------------*/
341/*---------------------------------------------------------------------------*/
342
343/*---------------------------------------------------------------------------*/
344/*---------------------------------------------------------------------------*/
345
347: public ParallelMngInternal
348{
349 public:
350
351 explicit Impl(MpiParallelMng* pm)
352 : ParallelMngInternal(pm)
353 , m_parallel_mng(pm)
354 {}
355
356 ~Impl() override = default;
357
358 public:
359
361 {
362 return makeRef(m_parallel_mng->adapter()->windowCreator()->createWindow(sizeof_segment, sizeof_type));
363 }
364
366 {
367 return makeRef(m_parallel_mng->adapter()->windowCreator()->createDynamicWindow(sizeof_segment, sizeof_type));
368 }
369
370 private:
371
372 MpiParallelMng* m_parallel_mng;
373};
374
375/*---------------------------------------------------------------------------*/
376/*---------------------------------------------------------------------------*/
377
378/*---------------------------------------------------------------------------*/
379/*---------------------------------------------------------------------------*/
380
381MpiParallelMng::
382MpiParallelMng(const MpiParallelMngBuildInfo& bi)
383: ParallelMngDispatcher(ParallelMngDispatcherBuildInfo(bi.dispatchersRef(),bi.messagePassingMngRef()))
384, m_trace(bi.trace_mng)
385, m_thread_mng(bi.thread_mng)
386, m_world_parallel_mng(bi.world_parallel_mng)
387, m_timer_mng(bi.timer_mng)
388, m_replication(new ParallelReplication())
389, m_is_parallel(bi.is_parallel)
390, m_comm_rank(bi.commRank())
391, m_comm_size(bi.commSize())
392, m_stat(bi.stat)
393, m_communicator(bi.mpiComm())
394, m_is_communicator_owned(bi.is_mpi_comm_owned)
395, m_mpi_lock(bi.mpi_lock)
396, m_non_blocking_collective(nullptr)
397, m_utils_factory(createRef<MpiParallelMngUtilsFactory>())
398, m_parallel_mng_internal(new Impl(this))
399{
400 if (!m_world_parallel_mng){
401 m_trace->debug()<<"[MpiParallelMng] No m_world_parallel_mng found, reverting to ourselves!";
402 m_world_parallel_mng = this;
403 }
404}
405
406/*---------------------------------------------------------------------------*/
407/*---------------------------------------------------------------------------*/
408
409MpiParallelMng::
410~MpiParallelMng()
411{
412 delete m_parallel_mng_internal;
413 delete m_non_blocking_collective;
414 m_sequential_parallel_mng.reset();
415 if (m_is_communicator_owned){
416 MpiLock::Section ls(m_mpi_lock);
417 MPI_Comm_free(&m_communicator);
418 }
419 delete m_replication;
420 delete m_io_mng;
421 if (m_is_timer_owned)
422 delete m_timer_mng;
423 arcaneCallFunctionAndTerminateIfThrow([&]() { m_adapter->destroy(); });
424 delete m_datatype_list;
425}
426
427/*---------------------------------------------------------------------------*/
428/*---------------------------------------------------------------------------*/
429
430namespace
431{
432
433/*---------------------------------------------------------------------------*/
434/*---------------------------------------------------------------------------*/
435// Classe pour créer les différents dispatchers
436class DispatchCreator
437{
438 public:
439 DispatchCreator(ITraceMng* tm,IMessagePassingMng* mpm,MpiAdapter* adapter,MpiDatatypeList* datatype_list)
440 : m_tm(tm), m_mpm(mpm), m_adapter(adapter), m_datatype_list(datatype_list){}
441 public:
442 template<typename DataType> MpiParallelDispatchT<DataType>*
443 create()
444 {
445 MpiDatatype* dt = m_datatype_list->datatype(DataType());
446 return new MpiParallelDispatchT<DataType>(m_tm,m_mpm,m_adapter,dt);
447 }
448
449 ITraceMng* m_tm;
450 IMessagePassingMng* m_mpm;
451 MpiAdapter* m_adapter;
452 MpiDatatypeList* m_datatype_list;
453};
454
455/*---------------------------------------------------------------------------*/
456/*---------------------------------------------------------------------------*/
457
458class ControlDispatcherDecorator
459: public ParallelMngDispatcher::DefaultControlDispatcher
460{
461 public:
462
463 ControlDispatcherDecorator(IParallelMng* pm, MpiAdapter* adapter)
464 : ParallelMngDispatcher::DefaultControlDispatcher(pm), m_adapter(adapter) {}
465
466 IMessagePassingMng* commSplit(bool keep) override
467 {
468 return m_adapter->commSplit(keep);
469 }
470 MP::IProfiler* profiler() const override { return m_adapter->profiler(); }
471 void setProfiler(MP::IProfiler* p) override { m_adapter->setProfiler(p); }
472
473 private:
474 MpiAdapter* m_adapter;
475};
476}
477
478/*---------------------------------------------------------------------------*/
479/*---------------------------------------------------------------------------*/
480
482build()
483{
484 ITraceMng* tm = traceMng();
485 if (!m_timer_mng){
486 m_timer_mng = new MpiTimerMng(tm);
487 m_is_timer_owned = true;
488 }
489
490 // Créé le gestionnaire séquentiel associé.
491 {
493 bi.setTraceMng(traceMng());
494 bi.setCommunicator(communicator());
495 bi.setThreadMng(threadMng());
496 m_sequential_parallel_mng = arcaneCreateSequentialParallelMngRef(bi);
497 }
498
499 // Indique que les reduces doivent être fait dans l'ordre des processeurs
500 // afin de garantir une exécution déterministe
501 bool is_ordered_reduce = false;
502 if (platform::getEnvironmentVariable("ARCANE_ORDERED_REDUCE")=="TRUE")
503 is_ordered_reduce = true;
504 m_datatype_list = new MpiDatatypeList(is_ordered_reduce);
505
506 ARCANE_CHECK_POINTER(m_stat);
507
508 MpiAdapter* adapter = new MpiAdapter(m_trace,m_stat->toArccoreStat(),m_communicator,m_mpi_lock);
509 m_adapter = adapter;
510 auto mpm = _messagePassingMng();
511
512 // NOTE: cette instance sera détruite par le ParallelMngDispatcher
513 auto* control_dispatcher = new ControlDispatcherDecorator(this,m_adapter);
514 _setControlDispatcher(control_dispatcher);
515
516 // NOTE: cette instance sera détruite par le ParallelMngDispatcher
517 auto* serialize_dispatcher = new MpiSerializeDispatcher(m_adapter, mpm);
518 m_mpi_serialize_dispatcher = serialize_dispatcher;
519 _setSerializeDispatcher(serialize_dispatcher);
520
521 DispatchCreator creator(m_trace,mpm,m_adapter,m_datatype_list);
522 this->createDispatchers(creator);
523
524 m_io_mng = arcaneCreateIOMng(this);
525
526 m_non_blocking_collective = new MpiParallelNonBlockingCollective(tm,this,adapter);
527 m_non_blocking_collective->build();
528 if (m_mpi_lock)
529 m_trace->info() << "Using mpi with locks.";
530}
531
532/*---------------------------------------------------------------------------*/
533/*---------------------------------------------------------------------------*/
534
535/*----------------------------------------------------------------------------*/
536/*---------------------------------------------------------------------------*/
537
540{
541 Trace::Setter mci(m_trace,"Mpi");
542 if (m_is_initialized){
543 m_trace->warning() << "MpiParallelMng already initialized";
544 return;
545 }
546
547 m_trace->info() << "Initialisation de MpiParallelMng";
548 m_sequential_parallel_mng->initialize();
549
550 m_adapter->setTimeMetricCollector(timeMetricCollector());
551
552 m_is_initialized = true;
553}
554
555/*---------------------------------------------------------------------------*/
556/*---------------------------------------------------------------------------*/
557
558void MpiParallelMng::
559sendSerializer(ISerializer* s,Int32 rank)
560{
561 Trace::Setter mci(m_trace,"Mpi");
562 Timer::Phase tphase(timeStats(),TP_Communication);
564 m_mpi_serialize_dispatcher->legacySendSerializer(s,{MessageRank(rank),mpi_tag,Blocking});
565}
566
567/*---------------------------------------------------------------------------*/
568/*---------------------------------------------------------------------------*/
569
572{
573 return m_utils_factory->createSendSerializeMessage(this, rank)._release();
574}
575
576/*---------------------------------------------------------------------------*/
577/*---------------------------------------------------------------------------*/
578
579Request MpiParallelMng::
580sendSerializer(ISerializer* s,Int32 rank,[[maybe_unused]] ByteArray& bytes)
581{
582 Trace::Setter mci(m_trace,"Mpi");
583 Timer::Phase tphase(timeStats(),TP_Communication);
585 return m_mpi_serialize_dispatcher->legacySendSerializer(s,{MessageRank(rank),mpi_tag,NonBlocking});
586}
587
588/*---------------------------------------------------------------------------*/
589/*---------------------------------------------------------------------------*/
590
591void MpiParallelMng::
592broadcastSerializer(ISerializer* values,Int32 rank)
593{
594 Timer::Phase tphase(timeStats(),TP_Communication);
595 m_mpi_serialize_dispatcher->broadcastSerializer(values,MessageRank(rank));
596}
597
598/*---------------------------------------------------------------------------*/
599/*---------------------------------------------------------------------------*/
600
601void MpiParallelMng::
602recvSerializer(ISerializer* values,Int32 rank)
603{
604 Trace::Setter mci(m_trace,"Mpi");
605 Timer::Phase tphase(timeStats(),TP_Communication);
607 m_mpi_serialize_dispatcher->legacyReceiveSerializer(values,MessageRank(rank),mpi_tag);
608}
609
610/*---------------------------------------------------------------------------*/
611/*---------------------------------------------------------------------------*/
612
615{
616 return m_utils_factory->createReceiveSerializeMessage(this, rank)._release();
617}
618
619/*---------------------------------------------------------------------------*/
620/*---------------------------------------------------------------------------*/
621
623probe(const PointToPointMessageInfo& message) -> MessageId
624{
625 return m_adapter->probeMessage(message);
626}
627
628/*---------------------------------------------------------------------------*/
629/*---------------------------------------------------------------------------*/
630
633{
634 return m_adapter->legacyProbeMessage(message);
635}
636
637/*---------------------------------------------------------------------------*/
638/*---------------------------------------------------------------------------*/
639
640Request MpiParallelMng::
641sendSerializer(const ISerializer* s,const PointToPointMessageInfo& message)
642{
643 return m_mpi_serialize_dispatcher->sendSerializer(s,message);
644}
645
646/*---------------------------------------------------------------------------*/
647/*---------------------------------------------------------------------------*/
648
649Request MpiParallelMng::
650receiveSerializer(ISerializer* s,const PointToPointMessageInfo& message)
651{
652 return m_mpi_serialize_dispatcher->receiveSerializer(s,message);
653}
654
655/*---------------------------------------------------------------------------*/
656/*---------------------------------------------------------------------------*/
657
660{
661 for( Integer i=0, is=requests.size(); i<is; ++i )
662 m_adapter->freeRequest(requests[i]);
663}
664
665/*---------------------------------------------------------------------------*/
666/*---------------------------------------------------------------------------*/
667
668void MpiParallelMng::
669_checkFinishedSubRequests()
670{
671 m_mpi_serialize_dispatcher->checkFinishedSubRequests();
672}
673
674/*---------------------------------------------------------------------------*/
675/*---------------------------------------------------------------------------*/
676
677Ref<IParallelMng> MpiParallelMng::
678sequentialParallelMngRef()
679{
680 return m_sequential_parallel_mng;
681}
682
685{
686 return m_sequential_parallel_mng.get();
687}
688
689/*---------------------------------------------------------------------------*/
690/*---------------------------------------------------------------------------*/
691
694{
695 if (m_stat)
696 m_stat->print(m_trace);
697}
698
699/*---------------------------------------------------------------------------*/
700/*---------------------------------------------------------------------------*/
701
703barrier()
704{
705 traceMng()->flush();
706 m_adapter->barrier();
707}
708
709/*---------------------------------------------------------------------------*/
710/*---------------------------------------------------------------------------*/
711
714{
715 m_adapter->waitAllRequests(requests);
716 _checkFinishedSubRequests();
717}
718
719/*---------------------------------------------------------------------------*/
720/*---------------------------------------------------------------------------*/
721
724{
725 return _waitSomeRequests(requests, false);
726}
727
728/*---------------------------------------------------------------------------*/
729/*---------------------------------------------------------------------------*/
730
733{
734 return _waitSomeRequests(requests, true);
735}
736
737/*---------------------------------------------------------------------------*/
738/*---------------------------------------------------------------------------*/
739
740UniqueArray<Integer> MpiParallelMng::
741_waitSomeRequests(ArrayView<Request> requests, bool is_non_blocking)
742{
743 UniqueArray<Integer> results;
744 UniqueArray<bool> done_indexes(requests.size());
745
746 m_adapter->waitSomeRequests(requests, done_indexes, is_non_blocking);
747 for (int i = 0 ; i < requests.size() ; i++) {
748 if (done_indexes[i])
749 results.add(i);
750 }
751 return results;
752}
753
754/*---------------------------------------------------------------------------*/
755/*---------------------------------------------------------------------------*/
756
757ISerializeMessageList* MpiParallelMng::
758_createSerializeMessageList()
759{
760 return new MP::internal::SerializeMessageList(messagePassingMng());
761}
762
763/*---------------------------------------------------------------------------*/
764/*---------------------------------------------------------------------------*/
765
768{
769 return m_utils_factory->createGetVariablesValuesOperation(this)._release();
770}
771
772/*---------------------------------------------------------------------------*/
773/*---------------------------------------------------------------------------*/
774
777{
778 return m_utils_factory->createTransferValuesOperation(this)._release();
779}
780
781/*---------------------------------------------------------------------------*/
782/*---------------------------------------------------------------------------*/
783
786{
787 return m_utils_factory->createExchanger(this)._release();
788}
789
790/*---------------------------------------------------------------------------*/
791/*---------------------------------------------------------------------------*/
792
795{
796 return m_utils_factory->createSynchronizer(this,family)._release();
797}
798
799/*---------------------------------------------------------------------------*/
800/*---------------------------------------------------------------------------*/
801
803createSynchronizer(const ItemGroup& group)
804{
805 return m_utils_factory->createSynchronizer(this,group)._release();
806}
807
808/*---------------------------------------------------------------------------*/
809/*---------------------------------------------------------------------------*/
810
813{
814 return m_utils_factory->createTopology(this)._release();
815}
816
817/*---------------------------------------------------------------------------*/
818/*---------------------------------------------------------------------------*/
819
821replication() const
822{
823 return m_replication;
824}
825
826/*---------------------------------------------------------------------------*/
827/*---------------------------------------------------------------------------*/
828
831{
832 delete m_replication;
833 m_replication = v;
834}
835
836/*---------------------------------------------------------------------------*/
837/*---------------------------------------------------------------------------*/
838
839IParallelMng* MpiParallelMng::
840_createSubParallelMng(MPI_Comm sub_communicator)
841{
842 // Si nul, ce rang ne fait pas partie du sous-communicateur
843 if (sub_communicator==MPI_COMM_NULL)
844 return nullptr;
845
846 int sub_rank = -1;
847 MPI_Comm_rank(sub_communicator,&sub_rank);
848
849 MpiParallelMngBuildInfo bi(sub_communicator);
850 bi.is_parallel = isParallel();
851 bi.stat = m_stat;
852 bi.timer_mng = m_timer_mng;
853 bi.thread_mng = m_thread_mng;
854 bi.trace_mng = m_trace;
855 bi.world_parallel_mng = m_world_parallel_mng;
856 bi.mpi_lock = m_mpi_lock;
857
858 IParallelMng* sub_pm = new MpiParallelMng(bi);
859 sub_pm->build();
860 return sub_pm;
861}
862
863/*---------------------------------------------------------------------------*/
864/*---------------------------------------------------------------------------*/
865
866Ref<IParallelMng> MpiParallelMng::
867_createSubParallelMngRef(Int32 color, Int32 key)
868{
869 if (color < 0)
870 color = MPI_UNDEFINED;
871 MPI_Comm sub_communicator = MPI_COMM_NULL;
872 MPI_Comm_split(m_communicator, color, key, &sub_communicator);
873 IParallelMng* sub_pm = _createSubParallelMng(sub_communicator);
874 return makeRef(sub_pm);
875}
876
877/*---------------------------------------------------------------------------*/
878/*---------------------------------------------------------------------------*/
879
880IParallelMng* MpiParallelMng::
881_createSubParallelMng(Int32ConstArrayView kept_ranks)
882{
883 MPI_Group mpi_group = MPI_GROUP_NULL;
884 MPI_Comm_group(m_communicator, &mpi_group);
885 Integer nb_sub_rank = kept_ranks.size();
886 UniqueArray<int> mpi_kept_ranks(nb_sub_rank);
887 for (Integer i = 0; i < nb_sub_rank; ++i)
888 mpi_kept_ranks[i] = (int)kept_ranks[i];
889
890 MPI_Group final_group = MPI_GROUP_NULL;
891 MPI_Group_incl(mpi_group, nb_sub_rank, mpi_kept_ranks.data(), &final_group);
892 MPI_Comm sub_communicator = MPI_COMM_NULL;
893
894 MPI_Comm_create(m_communicator, final_group, &sub_communicator);
895 MPI_Group_free(&final_group);
896 return _createSubParallelMng(sub_communicator);
897}
898
899/*---------------------------------------------------------------------------*/
900/*---------------------------------------------------------------------------*/
909: public MpiRequestList
910{
911 using Base = MpiRequestList;
912 public:
913 explicit RequestList(MpiParallelMng* pm)
914 : Base(pm->m_adapter), m_parallel_mng(pm){}
915 public:
916 void _wait(Parallel::eWaitType wait_type) override
917 {
918 Base::_wait(wait_type);
919 m_parallel_mng->_checkFinishedSubRequests();
920 };
921 private:
922 MpiParallelMng* m_parallel_mng;
923};
924
925/*---------------------------------------------------------------------------*/
926/*---------------------------------------------------------------------------*/
927
933
934/*---------------------------------------------------------------------------*/
935/*---------------------------------------------------------------------------*/
936
939{
940 return m_utils_factory;
941}
942
943/*---------------------------------------------------------------------------*/
944/*---------------------------------------------------------------------------*/
945
946bool MpiParallelMng::
947_isAcceleratorAware() const
948{
950}
951
952/*---------------------------------------------------------------------------*/
953/*---------------------------------------------------------------------------*/
954
955} // End namespace Arcane
956
957/*---------------------------------------------------------------------------*/
958/*---------------------------------------------------------------------------*/
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Liste des fonctions d'échange de message.
Vue modifiable d'un tableau d'un type T.
constexpr Integer size() const noexcept
Retourne la taille du tableau.
const T * data() const
Accès à la racine du tableau hors toute protection.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
Opérations pour accéder aux valeurs de variables d'un autre sous-domaine.
Interface du gestionnaire des entrées sorties.
Definition IIOMng.h:36
Interface d'une famille d'entités.
Definition IItemFamily.h:84
virtual ItemGroup allItems() const =0
Groupe de toutes les entités.
Échange d'informations entre processeurs.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual ITraceMng * traceMng() const =0
Gestionnaire de traces.
virtual void build()=0
Construit l'instance.
Informations sur la réplication des sous-domaines en parallèle.
Informations sur la topologie d'allocation des coeurs de calcul.
Interface du gestionnaire de traces.
virtual void flush()=0
Flush tous les flots.
virtual TraceMessage info()=0
Flot pour un message d'information.
Envoie de valeurs sur différents processeurs.
Interface d'un communicateur MPI spécifique pour les synchronisations.
Interface d'un service de synchronisation de variable.
Groupe d'entités de maillage.
Definition ItemGroup.h:49
eItemKind itemKind() const
Genre du groupe. Il s'agit du genre de ses éléments.
Definition ItemGroup.h:109
bool isAllItems() const
Indique si le groupe est celui de toutes les entités.
Definition ItemGroup.cc:610
Interface d'un message de sérialisation entre IMessagePassingMng.
Informations sur la source d'un message.
void _wait(eWaitType wait_type) override
Effectue l'attente ou le test.
Request receiveSerializer(ISerializer *s, const PointToPointMessageInfo &message) override
Message de réception.
Request sendSerializer(const ISerializer *s, const PointToPointMessageInfo &message) override
Message d'envoi.
Informations pour envoyer/recevoir un message point à point.
Requête d'un message.
Definition Request.h:77
Message de sérialisation utilisant un BasicSerializer.
static MessageTag defaultTag()
Tag par défaut pour les messages de sérialisation.
Gère les MPI_Datatype associées aux types Arcane.
Ref< IVariableSynchronizer > createSynchronizer(IParallelMng *pm, IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
Ref< IVariableSynchronizer > createSynchronizer(IParallelMng *pm, const ItemGroup &group) override
Retourne une interface pour synchroniser des variables sur le groupe group.
Ref< IDynamicMachineMemoryWindowBaseInternal > createDynamicMachineMemoryWindowBase(Int64 sizeof_segment, Int32 sizeof_type) override
Méthode permettant de créer une fenêtre mémoire dynamique sur le noeud.
Ref< IMachineMemoryWindowBaseInternal > createMachineMemoryWindowBase(Int64 sizeof_segment, Int32 sizeof_type) override
Méthode permettant de créer une fenêtre mémoire sur le noeud.
Spécialisation de MpiRequestList pour MpiParallelMng.
void _wait(Parallel::eWaitType wait_type) override
Effectue l'attente ou le test.
Gestionnaire du parallélisme utilisant MPI.
IParallelMng * worldParallelMng() const override
Gestionnaire de parallélisme sur l'ensemble des ressources allouées.
MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
void barrier() override
Effectue une barière.
UniqueArray< Integer > waitSomeRequests(ArrayView< Request > requests) override
Bloque en attendant qu'une des requêtes rvalues soit terminée.
void waitAllRequests(ArrayView< Request > requests) override
Bloque en attendant que les requêtes rvalues soient terminées.
MessageId probe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
void build() override
Construit l'instance.
void printStats() override
Affiche des statistiques liées à ce gestionnaire du parallélisme.
bool m_is_initialized
true si déjà initialisé
IParallelMng * sequentialParallelMng() override
Retourne un gestionnaire de parallélisme séquentiel.
IThreadMng * threadMng() const override
Gestionnaire de threads.
ITimerMng * timerMng() const override
Gestionnaire de timers.
void initialize() override
Initialise le gestionnaire du parallélisme.
IVariableSynchronizer * createSynchronizer(IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
ISerializeMessage * createSendSerializer(Int32 rank) override
Créé un message non bloquant pour envoyer des données sérialisées au rang rank.
bool isParallel() const override
Retourne true si l'exécution est parallèle.
Ref< IParallelMngUtilsFactory > _internalUtilsFactory() const override
Fabrique des fonctions utilitaires.
ITraceMng * traceMng() const override
Gestionnaire de traces.
IParallelTopology * createTopology() override
Créé une instance contenant les infos sur la topologie des rangs de ce gestionnnaire.
Communicator communicator() const override
Communicateur MPI associé à ce gestionnaire.
IParallelExchanger * createExchanger() override
Retourne une interface pour transférer des messages entre processeurs.
IParallelReplication * replication() const override
Informations sur la réplication.
void setReplication(IParallelReplication *v) override
Positionne les Informations sur la réplication.
Ref< Parallel::IRequestList > createRequestListRef() override
Créé une liste de requêtes pour ce gestionnaire.
ITransferValuesParallelOperation * createTransferValuesOperation() override
Retourne une opération pour transférer des valeurs entre sous-domaine.
UniqueArray< Integer > testSomeRequests(ArrayView< Request > requests) override
Test si une des requêtes rvalues est terminée.
IGetVariablesValuesParallelOperation * createGetVariablesValuesOperation() override
Retourne une opération pour récupérer les valeurs d'une variable sur les entités d'un autre sous-doma...
ISerializeMessage * createReceiveSerializer(Int32 rank) override
Créé un message non bloquant pour recevoir des données sérialisées du rang rank.
void freeRequests(ArrayView< Parallel::Request > requests) override
Libère les requêtes.
Gestionnaire du parallélisme utilisant MPI.
Gestionnaire de timer utisant la bibliothèque MPI.
Definition MpiTimerMng.h:39
void compute() override
Recalcule les infos de synchronisation.
Redirige la gestion des messages des sous-domaines suivant le type de l'argument.
IMessagePassingMng * messagePassingMng() const override
Gestionnaire de message de Arccore associé
ITimeMetricCollector * timeMetricCollector() const override
Collecteur Arccore des statistiques temporelles (peut être nul)
ITimeStats * timeStats() const override
Gestionnaire de statistiques associé (peut être nul)
Classe de base d'une fabrique pour les fonctions utilitaires de IParallelMng.
Informations sur la réplication des sous-domaines en parallèle.
Référence à une instance.
InstanceType * get() const
Instance associée ou nullptr si aucune.
Chaîne de caractères unicode.
bool null() const
Retourne true si la chaîne est nulle.
Definition String.cc:305
Positionne la phase de l'action en cours d'exécution.
Definition Timer.h:128
Positionne une classe de message.
Vecteur 1D de données avec sémantique par valeur (style STL).
MPI_Comm communicator() const override
Récupère le communicateur spécifique de la topologie.
void compute(VariableSynchronizer *var_syncer) override
Calcul le communicateur spécifique.
Interface d'un service de synchronisation de variable.
void compute() override
Création de la liste des éléments de synchronisation.
Int32ConstArrayView communicatingRanks() override
Rangs des sous-domaines avec lesquels on communique.
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
ARCCORE_BASE_EXPORT String getEnvironmentVariable(const String &name)
Variable d'environnement du nom name.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
Ref< TrueType > createRef(Args &&... args)
Créé une instance de type TrueType avec les arguments Args et retourne une référence dessus.
ARCANE_MPI_EXPORT bool arcaneIsAcceleratorAwareMPI()
Indique si le runtime actuel de MPI a le support des accélérateurs.
Definition ArcaneMpi.cc:82
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
Array< Byte > ByteArray
Tableau dynamique à une dimension de caractères.
Definition UtilsTypes.h:208
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:569
@ IK_Cell
Entité de maillage de genre maille.
void arcaneCallFunctionAndTerminateIfThrow(std::function< void()> function)
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int32_t Int32
Type entier signé sur 32 bits.
Infos pour construire un MpiParallelMng.
Infos pour construire un SequentialParallelMng.