Arcane  v3.16.7.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
MpiParallelMng.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* MpiParallelMng.cc (C) 2000-2025 */
9/* */
10/* Gestionnaire de parallélisme utilisant MPI. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/utils/Collection.h"
15#include "arcane/utils/Enumerator.h"
16#include "arcane/utils/ScopedPtr.h"
17#include "arcane/utils/PlatformUtils.h"
18#include "arcane/utils/TimeoutException.h"
19#include "arcane/utils/NotImplementedException.h"
20#include "arcane/utils/ArgumentException.h"
21#include "arcane/utils/ITraceMng.h"
22#include "arcane/utils/ValueConvert.h"
23#include "arcane/utils/Exception.h"
24#include "arcane/utils/HPReal.h"
25
26#include "arcane/core/IIOMng.h"
27#include "arcane/core/Timer.h"
28#include "arcane/core/IItemFamily.h"
29#include "arcane/core/parallel/IStat.h"
30#include "arcane/core/internal/SerializeMessage.h"
31#include "arcane/core/internal/ParallelMngInternal.h"
32
33#include "arcane/parallel/mpi/MpiParallelMng.h"
34#include "arcane/parallel/mpi/MpiParallelDispatch.h"
35#include "arcane/parallel/mpi/MpiTimerMng.h"
36#include "arcane/parallel/mpi/MpiSerializeMessage.h"
37#include "arcane/parallel/mpi/MpiParallelNonBlockingCollective.h"
38#include "arcane/parallel/mpi/MpiDatatype.h"
39#include "arcane/parallel/mpi/IVariableSynchronizerMpiCommunicator.h"
40
41#include "arcane/impl/ParallelReplication.h"
42#include "arcane/impl/SequentialParallelMng.h"
43#include "arcane/impl/internal/ParallelMngUtilsFactoryBase.h"
44#include "arcane/impl/internal/VariableSynchronizer.h"
45
46#include "arccore/message_passing_mpi/MpiMessagePassingMng.h"
47#include "arccore/message_passing_mpi/internal/MpiSerializeDispatcher.h"
48#include "arccore/message_passing_mpi/internal/MpiRequestList.h"
49#include "arccore/message_passing_mpi/internal/MpiAdapter.h"
50#include "arccore/message_passing_mpi/internal/MpiLock.h"
51#include "arccore/message_passing_mpi/internal/MpiMachineMemoryWindowBaseInternalCreator.h"
52#include "arccore/message_passing_mpi/internal/MpiMachineMemoryWindowBaseInternal.h"
53#include "arccore/message_passing/Dispatchers.h"
55#include "arccore/message_passing/SerializeMessageList.h"
56
57//#define ARCANE_TRACE_MPI
58
59/*---------------------------------------------------------------------------*/
60/*---------------------------------------------------------------------------*/
61
62namespace Arcane
63{
64using namespace Arcane::MessagePassing;
65using namespace Arcane::MessagePassing::Mpi;
67
68/*---------------------------------------------------------------------------*/
69/*---------------------------------------------------------------------------*/
70
71extern "C++" IIOMng*
72arcaneCreateIOMng(IParallelMng* psm);
73
74#if defined(ARCANE_HAS_MPI_NEIGHBOR)
75// Défini dans MpiNeighborVariableSynchronizeDispatcher
77arcaneCreateMpiNeighborVariableSynchronizerFactory(MpiParallelMng* mpi_pm,
78 Ref<IVariableSynchronizerMpiCommunicator> synchronizer_communicator);
79#endif
81arcaneCreateMpiBlockVariableSynchronizerFactory(MpiParallelMng* mpi_pm, Int32 block_size, Int32 nb_sequence);
83arcaneCreateMpiVariableSynchronizerFactory(MpiParallelMng* mpi_pm);
85arcaneCreateMpiDirectSendrecvVariableSynchronizerFactory(MpiParallelMng* mpi_pm);
87arcaneCreateMpiLegacyVariableSynchronizerFactory(MpiParallelMng* mpi_pm);
88
89/*---------------------------------------------------------------------------*/
90/*---------------------------------------------------------------------------*/
91
92MpiParallelMngBuildInfo::
93MpiParallelMngBuildInfo(MPI_Comm comm)
94: is_parallel(false)
95, comm_rank(MessagePassing::A_NULL_RANK)
96, comm_nb_rank(0)
97, stat(nullptr)
98, trace_mng(nullptr)
99, timer_mng(nullptr)
100, thread_mng(nullptr)
101, mpi_comm(comm)
102, is_mpi_comm_owned(true)
103, mpi_lock(nullptr)
104{
105 ::MPI_Comm_rank(comm,&comm_rank);
106 ::MPI_Comm_size(comm,&comm_nb_rank);
107
108 m_dispatchers_ref = createRef<MP::Dispatchers>();
109 MP::Mpi::MpiMessagePassingMng::BuildInfo bi(comm_rank,comm_nb_rank,m_dispatchers_ref.get(),mpi_comm);
110
111 m_message_passing_mng_ref = createRef<MP::Mpi::MpiMessagePassingMng>(bi);
112}
113
114/*---------------------------------------------------------------------------*/
115/*---------------------------------------------------------------------------*/
116
117/*---------------------------------------------------------------------------*/
118/*---------------------------------------------------------------------------*/
122class VariableSynchronizerMpiCommunicator
124{
125 public:
126 explicit VariableSynchronizerMpiCommunicator(MpiParallelMng* pm)
127 : m_mpi_parallel_mng(pm){}
128 ~VariableSynchronizerMpiCommunicator() override
129 {
130 _checkFreeCommunicator();
131 }
132 MPI_Comm communicator() const override
133 {
134 return m_topology_communicator;
135 }
136 void compute(VariableSynchronizer* var_syncer) override
137 {
138 Int32ConstArrayView comm_ranks = var_syncer->communicatingRanks();
139 const Int32 nb_message = comm_ranks.size();
140
141 MpiParallelMng* pm = m_mpi_parallel_mng;
142
143 MPI_Comm old_comm = pm->communicator();
144
145 UniqueArray<int> destinations(nb_message);
146 for( Integer i=0; i<nb_message; ++i ){
147 destinations[i] = comm_ranks[i];
148 }
149
150 _checkFreeCommunicator();
151
152 int r = MPI_Dist_graph_create_adjacent(old_comm, nb_message, destinations.data(), MPI_UNWEIGHTED,
153 nb_message, destinations.data(), MPI_UNWEIGHTED,
154 MPI_INFO_NULL, 0, &m_topology_communicator);
155
156 if (r!=MPI_SUCCESS)
157 ARCANE_FATAL("Error '{0}' in MPI_Dist_graph_create",r);
158
159 // Vérifie que l'ordre des rangs pour l'implémentation MPI est le même que celui qu'on a dans
160 // le VariableSynchronizer.
161 {
162 int indegree = 0;
163 int outdegree = 0;
164 int weighted = 0;
165 MPI_Dist_graph_neighbors_count(m_topology_communicator,&indegree,&outdegree,&weighted);
166
167 if (indegree!=nb_message)
168 ARCANE_FATAL("Bad value '{0}' for 'indegree' (expected={1})",indegree,nb_message);
169 if (outdegree!=nb_message)
170 ARCANE_FATAL("Bad value '{0}' for 'outdegree' (expected={1})",outdegree,nb_message);
171
172 UniqueArray<int> srcs(indegree);
173 UniqueArray<int> dsts(outdegree);
174
175 MPI_Dist_graph_neighbors(m_topology_communicator,indegree,srcs.data(),MPI_UNWEIGHTED,outdegree,dsts.data(),MPI_UNWEIGHTED);
176
177 for(int k=0; k<outdegree; ++k){
178 int x = dsts[k];
179 if (x!=comm_ranks[k])
180 ARCANE_FATAL("Invalid destination rank order k={0} v={1} expected={2}",k,x,comm_ranks[k]);
181 }
182
183 for(int k=0; k<indegree; ++k ){
184 int x = srcs[k];
185 if (x!=comm_ranks[k])
186 ARCANE_FATAL("Invalid source rank order k={0} v={1} expected={2}",k,x,comm_ranks[k]);
187 }
188 }
189 }
190
191 private:
192
193 MpiParallelMng* m_mpi_parallel_mng = nullptr;
194 MPI_Comm m_topology_communicator = MPI_COMM_NULL;
195
196 private:
197
198 void _checkFreeCommunicator()
199 {
200 if (m_topology_communicator!=MPI_COMM_NULL)
201 MPI_Comm_free(&m_topology_communicator);
202 m_topology_communicator = MPI_COMM_NULL;
203 }
204};
205
206/*---------------------------------------------------------------------------*/
207/*---------------------------------------------------------------------------*/
214class MpiVariableSynchronizer
215: public VariableSynchronizer
216{
217 public:
218 MpiVariableSynchronizer(IParallelMng* pm,const ItemGroup& group,
219 Ref<IDataSynchronizeImplementationFactory> implementation_factory,
221 : VariableSynchronizer(pm,group,implementation_factory)
222 , m_topology_info(topology_info)
223 {
224 }
225 public:
226 void compute() override
227 {
229 // Si non nul, calcule la topologie
230 if (m_topology_info.get())
231 m_topology_info->compute(this);
232 }
233 private:
235};
236
237/*---------------------------------------------------------------------------*/
238/*---------------------------------------------------------------------------*/
239
240class MpiParallelMngUtilsFactory
242{
243 public:
244 MpiParallelMngUtilsFactory()
245 : m_synchronizer_version(2)
246 {
247 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="1")
248 m_synchronizer_version = 1;
249 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="2")
250 m_synchronizer_version = 2;
251 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="3")
252 m_synchronizer_version = 3;
253 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="4"){
254 m_synchronizer_version = 4;
255 String v = platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_BLOCK_SIZE");
256 if (!v.null()){
257 Int32 block_size = 0;
258 if (!builtInGetValue(block_size,v))
259 m_synchronize_block_size = block_size;
260 m_synchronize_block_size = std::clamp(m_synchronize_block_size,0,1000000000);
261 }
262 v = platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_NB_SEQUENCE");
263 if (!v.null()){
264 Int32 nb_sequence = 0;
265 if (!builtInGetValue(nb_sequence,v))
266 m_synchronize_nb_sequence = nb_sequence;
267 m_synchronize_nb_sequence = std::clamp(m_synchronize_nb_sequence,1,1024*1024);
268 }
269 }
270 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="5")
271 m_synchronizer_version = 5;
272 }
273 public:
274
276 {
277 return _createSynchronizer(pm,family->allItems());
278 }
279
281 {
282 return _createSynchronizer(pm,group);
283 }
284
285 private:
286
287 Ref<IVariableSynchronizer> _createSynchronizer(IParallelMng* pm,const ItemGroup& group)
288 {
290 MpiParallelMng* mpi_pm = ARCANE_CHECK_POINTER(dynamic_cast<MpiParallelMng*>(pm));
291 ITraceMng* tm = pm->traceMng();
293 // N'affiche les informations que pour le groupe de toutes les mailles pour éviter d'afficher
294 // plusieurs fois le même message.
295 bool do_print = (group.isAllItems() && group.itemKind()==IK_Cell);
296 if (m_synchronizer_version == 2){
297 if (do_print)
298 tm->info() << "Using MpiSynchronizer V2";
299 generic_factory = arcaneCreateMpiVariableSynchronizerFactory(mpi_pm);
300 }
301 else if (m_synchronizer_version == 3 ){
302 if (do_print)
303 tm->info() << "Using MpiSynchronizer V3";
304 generic_factory = arcaneCreateMpiDirectSendrecvVariableSynchronizerFactory(mpi_pm);
305 }
306 else if (m_synchronizer_version == 4){
307 if (do_print)
308 tm->info() << "Using MpiSynchronizer V4 block_size=" << m_synchronize_block_size
309 << " nb_sequence=" << m_synchronize_nb_sequence;
310 generic_factory = arcaneCreateMpiBlockVariableSynchronizerFactory(mpi_pm,m_synchronize_block_size,m_synchronize_nb_sequence);
311 }
312 else if (m_synchronizer_version == 5){
313 if (do_print)
314 tm->info() << "Using MpiSynchronizer V5";
315 topology_info = createRef<VariableSynchronizerMpiCommunicator>(mpi_pm);
316#if defined(ARCANE_HAS_MPI_NEIGHBOR)
317 generic_factory = arcaneCreateMpiNeighborVariableSynchronizerFactory(mpi_pm,topology_info);
318#else
319 throw NotSupportedException(A_FUNCINFO,"Synchronize implementation V5 is not supported with this version of MPI");
320#endif
321 }
322 else{
323 if (do_print)
324 tm->info() << "Using MpiSynchronizer V1";
325 generic_factory = arcaneCreateMpiLegacyVariableSynchronizerFactory(mpi_pm);
326 }
327 if (!generic_factory.get())
328 ARCANE_FATAL("No factory created");
329 return createRef<MpiVariableSynchronizer>(pm,group,generic_factory,topology_info);
330 }
331
332 private:
333
334 Integer m_synchronizer_version = 1;
335 Int32 m_synchronize_block_size = 32000;
336 Int32 m_synchronize_nb_sequence = 1;
337};
338
339/*---------------------------------------------------------------------------*/
340/*---------------------------------------------------------------------------*/
341
342/*---------------------------------------------------------------------------*/
343/*---------------------------------------------------------------------------*/
344
346: public ParallelMngInternal
347{
348 public:
349
350 explicit Impl(MpiParallelMng* pm)
351 : ParallelMngInternal(pm)
352 , m_parallel_mng(pm)
353 {}
354
355 ~Impl() override = default;
356
357 public:
358
360 {
361 return makeRef(m_parallel_mng->adapter()->windowCreator()->createWindow(sizeof_segment, sizeof_type));
362 }
363
364 private:
365
366 MpiParallelMng* m_parallel_mng;
367};
368
369/*---------------------------------------------------------------------------*/
370/*---------------------------------------------------------------------------*/
371
372/*---------------------------------------------------------------------------*/
373/*---------------------------------------------------------------------------*/
374
375MpiParallelMng::
376MpiParallelMng(const MpiParallelMngBuildInfo& bi)
377: ParallelMngDispatcher(ParallelMngDispatcherBuildInfo(bi.dispatchersRef(),bi.messagePassingMngRef()))
378, m_trace(bi.trace_mng)
379, m_thread_mng(bi.thread_mng)
380, m_world_parallel_mng(bi.world_parallel_mng)
381, m_timer_mng(bi.timer_mng)
382, m_replication(new ParallelReplication())
383, m_is_parallel(bi.is_parallel)
384, m_comm_rank(bi.commRank())
385, m_comm_size(bi.commSize())
386, m_stat(bi.stat)
387, m_communicator(bi.mpiComm())
388, m_is_communicator_owned(bi.is_mpi_comm_owned)
389, m_mpi_lock(bi.mpi_lock)
390, m_non_blocking_collective(nullptr)
391, m_utils_factory(createRef<MpiParallelMngUtilsFactory>())
392, m_parallel_mng_internal(new Impl(this))
393{
394 if (!m_world_parallel_mng){
395 m_trace->debug()<<"[MpiParallelMng] No m_world_parallel_mng found, reverting to ourselves!";
396 m_world_parallel_mng = this;
397 }
398}
399
400/*---------------------------------------------------------------------------*/
401/*---------------------------------------------------------------------------*/
402
403MpiParallelMng::
404~MpiParallelMng()
405{
406 delete m_parallel_mng_internal;
407 delete m_non_blocking_collective;
408 m_sequential_parallel_mng.reset();
409 if (m_is_communicator_owned){
410 MpiLock::Section ls(m_mpi_lock);
411 MPI_Comm_free(&m_communicator);
412 }
413 delete m_replication;
414 delete m_io_mng;
415 if (m_is_timer_owned)
416 delete m_timer_mng;
417 arcaneCallFunctionAndTerminateIfThrow([&]() { m_adapter->destroy(); });
418 delete m_datatype_list;
419}
420
421/*---------------------------------------------------------------------------*/
422/*---------------------------------------------------------------------------*/
423
424namespace
425{
426
427/*---------------------------------------------------------------------------*/
428/*---------------------------------------------------------------------------*/
429// Classe pour créer les différents dispatchers
430class DispatchCreator
431{
432 public:
433 DispatchCreator(ITraceMng* tm,IMessagePassingMng* mpm,MpiAdapter* adapter,MpiDatatypeList* datatype_list)
434 : m_tm(tm), m_mpm(mpm), m_adapter(adapter), m_datatype_list(datatype_list){}
435 public:
436 template<typename DataType> MpiParallelDispatchT<DataType>*
437 create()
438 {
439 MpiDatatype* dt = m_datatype_list->datatype(DataType());
440 return new MpiParallelDispatchT<DataType>(m_tm,m_mpm,m_adapter,dt);
441 }
442
443 ITraceMng* m_tm;
444 IMessagePassingMng* m_mpm;
445 MpiAdapter* m_adapter;
446 MpiDatatypeList* m_datatype_list;
447};
448
449/*---------------------------------------------------------------------------*/
450/*---------------------------------------------------------------------------*/
451
452class ControlDispatcherDecorator
453: public ParallelMngDispatcher::DefaultControlDispatcher
454{
455 public:
456
457 ControlDispatcherDecorator(IParallelMng* pm, MpiAdapter* adapter)
458 : ParallelMngDispatcher::DefaultControlDispatcher(pm), m_adapter(adapter) {}
459
460 IMessagePassingMng* commSplit(bool keep) override
461 {
462 return m_adapter->commSplit(keep);
463 }
464 MP::IProfiler* profiler() const override { return m_adapter->profiler(); }
465 void setProfiler(MP::IProfiler* p) override { m_adapter->setProfiler(p); }
466
467 private:
468 MpiAdapter* m_adapter;
469};
470}
471
472/*---------------------------------------------------------------------------*/
473/*---------------------------------------------------------------------------*/
474
476build()
477{
478 ITraceMng* tm = traceMng();
479 if (!m_timer_mng){
480 m_timer_mng = new MpiTimerMng(tm);
481 m_is_timer_owned = true;
482 }
483
484 // Créé le gestionnaire séquentiel associé.
485 {
487 bi.setTraceMng(traceMng());
488 bi.setCommunicator(communicator());
489 bi.setThreadMng(threadMng());
490 m_sequential_parallel_mng = arcaneCreateSequentialParallelMngRef(bi);
491 }
492
493 // Indique que les reduces doivent être fait dans l'ordre des processeurs
494 // afin de garantir une exécution déterministe
495 bool is_ordered_reduce = false;
496 if (platform::getEnvironmentVariable("ARCANE_ORDERED_REDUCE")=="TRUE")
497 is_ordered_reduce = true;
498 m_datatype_list = new MpiDatatypeList(is_ordered_reduce);
499
500 ARCANE_CHECK_POINTER(m_stat);
501
502 MpiAdapter* adapter = new MpiAdapter(m_trace,m_stat->toArccoreStat(),m_communicator,m_mpi_lock);
503 m_adapter = adapter;
504 auto mpm = _messagePassingMng();
505
506 // NOTE: cette instance sera détruite par le ParallelMngDispatcher
507 auto* control_dispatcher = new ControlDispatcherDecorator(this,m_adapter);
508 _setControlDispatcher(control_dispatcher);
509
510 // NOTE: cette instance sera détruite par le ParallelMngDispatcher
511 auto* serialize_dispatcher = new MpiSerializeDispatcher(m_adapter, mpm);
512 m_mpi_serialize_dispatcher = serialize_dispatcher;
513 _setSerializeDispatcher(serialize_dispatcher);
514
515 DispatchCreator creator(m_trace,mpm,m_adapter,m_datatype_list);
516 this->createDispatchers(creator);
517
518 m_io_mng = arcaneCreateIOMng(this);
519
520 m_non_blocking_collective = new MpiParallelNonBlockingCollective(tm,this,adapter);
521 m_non_blocking_collective->build();
522 if (m_mpi_lock)
523 m_trace->info() << "Using mpi with locks.";
524}
525
526/*---------------------------------------------------------------------------*/
527/*---------------------------------------------------------------------------*/
528
529/*----------------------------------------------------------------------------*/
530/*---------------------------------------------------------------------------*/
531
534{
535 Trace::Setter mci(m_trace,"Mpi");
536 if (m_is_initialized){
537 m_trace->warning() << "MpiParallelMng already initialized";
538 return;
539 }
540
541 m_trace->info() << "Initialisation de MpiParallelMng";
542 m_sequential_parallel_mng->initialize();
543
544 m_adapter->setTimeMetricCollector(timeMetricCollector());
545
546 m_is_initialized = true;
547}
548
549/*---------------------------------------------------------------------------*/
550/*---------------------------------------------------------------------------*/
551
552void MpiParallelMng::
553sendSerializer(ISerializer* s,Int32 rank)
554{
555 Trace::Setter mci(m_trace,"Mpi");
556 Timer::Phase tphase(timeStats(),TP_Communication);
558 m_mpi_serialize_dispatcher->legacySendSerializer(s,{MessageRank(rank),mpi_tag,Blocking});
559}
560
561/*---------------------------------------------------------------------------*/
562/*---------------------------------------------------------------------------*/
563
566{
567 return m_utils_factory->createSendSerializeMessage(this, rank)._release();
568}
569
570/*---------------------------------------------------------------------------*/
571/*---------------------------------------------------------------------------*/
572
573Request MpiParallelMng::
574sendSerializer(ISerializer* s,Int32 rank,[[maybe_unused]] ByteArray& bytes)
575{
576 Trace::Setter mci(m_trace,"Mpi");
577 Timer::Phase tphase(timeStats(),TP_Communication);
579 return m_mpi_serialize_dispatcher->legacySendSerializer(s,{MessageRank(rank),mpi_tag,NonBlocking});
580}
581
582/*---------------------------------------------------------------------------*/
583/*---------------------------------------------------------------------------*/
584
585void MpiParallelMng::
586broadcastSerializer(ISerializer* values,Int32 rank)
587{
588 Timer::Phase tphase(timeStats(),TP_Communication);
589 m_mpi_serialize_dispatcher->broadcastSerializer(values,MessageRank(rank));
590}
591
592/*---------------------------------------------------------------------------*/
593/*---------------------------------------------------------------------------*/
594
595void MpiParallelMng::
596recvSerializer(ISerializer* values,Int32 rank)
597{
598 Trace::Setter mci(m_trace,"Mpi");
599 Timer::Phase tphase(timeStats(),TP_Communication);
601 m_mpi_serialize_dispatcher->legacyReceiveSerializer(values,MessageRank(rank),mpi_tag);
602}
603
604/*---------------------------------------------------------------------------*/
605/*---------------------------------------------------------------------------*/
606
609{
610 return m_utils_factory->createReceiveSerializeMessage(this, rank)._release();
611}
612
613/*---------------------------------------------------------------------------*/
614/*---------------------------------------------------------------------------*/
615
617probe(const PointToPointMessageInfo& message) -> MessageId
618{
619 return m_adapter->probeMessage(message);
620}
621
622/*---------------------------------------------------------------------------*/
623/*---------------------------------------------------------------------------*/
624
627{
628 return m_adapter->legacyProbeMessage(message);
629}
630
631/*---------------------------------------------------------------------------*/
632/*---------------------------------------------------------------------------*/
633
634Request MpiParallelMng::
635sendSerializer(const ISerializer* s,const PointToPointMessageInfo& message)
636{
637 return m_mpi_serialize_dispatcher->sendSerializer(s,message);
638}
639
640/*---------------------------------------------------------------------------*/
641/*---------------------------------------------------------------------------*/
642
643Request MpiParallelMng::
644receiveSerializer(ISerializer* s,const PointToPointMessageInfo& message)
645{
646 return m_mpi_serialize_dispatcher->receiveSerializer(s,message);
647}
648
649/*---------------------------------------------------------------------------*/
650/*---------------------------------------------------------------------------*/
651
654{
655 for( Integer i=0, is=requests.size(); i<is; ++i )
656 m_adapter->freeRequest(requests[i]);
657}
658
659/*---------------------------------------------------------------------------*/
660/*---------------------------------------------------------------------------*/
661
662void MpiParallelMng::
663_checkFinishedSubRequests()
664{
665 m_mpi_serialize_dispatcher->checkFinishedSubRequests();
666}
667
668/*---------------------------------------------------------------------------*/
669/*---------------------------------------------------------------------------*/
670
671Ref<IParallelMng> MpiParallelMng::
672sequentialParallelMngRef()
673{
674 return m_sequential_parallel_mng;
675}
676
679{
680 return m_sequential_parallel_mng.get();
681}
682
683/*---------------------------------------------------------------------------*/
684/*---------------------------------------------------------------------------*/
685
688{
689 if (m_stat)
690 m_stat->print(m_trace);
691}
692
693/*---------------------------------------------------------------------------*/
694/*---------------------------------------------------------------------------*/
695
697barrier()
698{
699 traceMng()->flush();
700 m_adapter->barrier();
701}
702
703/*---------------------------------------------------------------------------*/
704/*---------------------------------------------------------------------------*/
705
708{
709 m_adapter->waitAllRequests(requests);
710 _checkFinishedSubRequests();
711}
712
713/*---------------------------------------------------------------------------*/
714/*---------------------------------------------------------------------------*/
715
718{
719 return _waitSomeRequests(requests, false);
720}
721
722/*---------------------------------------------------------------------------*/
723/*---------------------------------------------------------------------------*/
724
727{
728 return _waitSomeRequests(requests, true);
729}
730
731/*---------------------------------------------------------------------------*/
732/*---------------------------------------------------------------------------*/
733
734UniqueArray<Integer> MpiParallelMng::
735_waitSomeRequests(ArrayView<Request> requests, bool is_non_blocking)
736{
737 UniqueArray<Integer> results;
738 UniqueArray<bool> done_indexes(requests.size());
739
740 m_adapter->waitSomeRequests(requests, done_indexes, is_non_blocking);
741 for (int i = 0 ; i < requests.size() ; i++) {
742 if (done_indexes[i])
743 results.add(i);
744 }
745 return results;
746}
747
748/*---------------------------------------------------------------------------*/
749/*---------------------------------------------------------------------------*/
750
751ISerializeMessageList* MpiParallelMng::
752_createSerializeMessageList()
753{
754 return new MP::internal::SerializeMessageList(messagePassingMng());
755}
756
757/*---------------------------------------------------------------------------*/
758/*---------------------------------------------------------------------------*/
759
762{
763 return m_utils_factory->createGetVariablesValuesOperation(this)._release();
764}
765
766/*---------------------------------------------------------------------------*/
767/*---------------------------------------------------------------------------*/
768
771{
772 return m_utils_factory->createTransferValuesOperation(this)._release();
773}
774
775/*---------------------------------------------------------------------------*/
776/*---------------------------------------------------------------------------*/
777
780{
781 return m_utils_factory->createExchanger(this)._release();
782}
783
784/*---------------------------------------------------------------------------*/
785/*---------------------------------------------------------------------------*/
786
789{
790 return m_utils_factory->createSynchronizer(this,family)._release();
791}
792
793/*---------------------------------------------------------------------------*/
794/*---------------------------------------------------------------------------*/
795
797createSynchronizer(const ItemGroup& group)
798{
799 return m_utils_factory->createSynchronizer(this,group)._release();
800}
801
802/*---------------------------------------------------------------------------*/
803/*---------------------------------------------------------------------------*/
804
807{
808 return m_utils_factory->createTopology(this)._release();
809}
810
811/*---------------------------------------------------------------------------*/
812/*---------------------------------------------------------------------------*/
813
815replication() const
816{
817 return m_replication;
818}
819
820/*---------------------------------------------------------------------------*/
821/*---------------------------------------------------------------------------*/
822
825{
826 delete m_replication;
827 m_replication = v;
828}
829
830/*---------------------------------------------------------------------------*/
831/*---------------------------------------------------------------------------*/
832
833IParallelMng* MpiParallelMng::
834_createSubParallelMng(MPI_Comm sub_communicator)
835{
836 // Si nul, ce rang ne fait pas partie du sous-communicateur
837 if (sub_communicator==MPI_COMM_NULL)
838 return nullptr;
839
840 int sub_rank = -1;
841 MPI_Comm_rank(sub_communicator,&sub_rank);
842
843 MpiParallelMngBuildInfo bi(sub_communicator);
844 bi.is_parallel = isParallel();
845 bi.stat = m_stat;
846 bi.timer_mng = m_timer_mng;
847 bi.thread_mng = m_thread_mng;
848 bi.trace_mng = m_trace;
849 bi.world_parallel_mng = m_world_parallel_mng;
850 bi.mpi_lock = m_mpi_lock;
851
852 IParallelMng* sub_pm = new MpiParallelMng(bi);
853 sub_pm->build();
854 return sub_pm;
855}
856
857/*---------------------------------------------------------------------------*/
858/*---------------------------------------------------------------------------*/
859
860Ref<IParallelMng> MpiParallelMng::
861_createSubParallelMngRef(Int32 color, Int32 key)
862{
863 if (color < 0)
864 color = MPI_UNDEFINED;
865 MPI_Comm sub_communicator = MPI_COMM_NULL;
866 MPI_Comm_split(m_communicator, color, key, &sub_communicator);
867 IParallelMng* sub_pm = _createSubParallelMng(sub_communicator);
868 return makeRef(sub_pm);
869}
870
871/*---------------------------------------------------------------------------*/
872/*---------------------------------------------------------------------------*/
873
874IParallelMng* MpiParallelMng::
875_createSubParallelMng(Int32ConstArrayView kept_ranks)
876{
877 MPI_Group mpi_group = MPI_GROUP_NULL;
878 MPI_Comm_group(m_communicator, &mpi_group);
879 Integer nb_sub_rank = kept_ranks.size();
880 UniqueArray<int> mpi_kept_ranks(nb_sub_rank);
881 for (Integer i = 0; i < nb_sub_rank; ++i)
882 mpi_kept_ranks[i] = (int)kept_ranks[i];
883
884 MPI_Group final_group = MPI_GROUP_NULL;
885 MPI_Group_incl(mpi_group, nb_sub_rank, mpi_kept_ranks.data(), &final_group);
886 MPI_Comm sub_communicator = MPI_COMM_NULL;
887
888 MPI_Comm_create(m_communicator, final_group, &sub_communicator);
889 MPI_Group_free(&final_group);
890 return _createSubParallelMng(sub_communicator);
891}
892
893/*---------------------------------------------------------------------------*/
894/*---------------------------------------------------------------------------*/
903: public MpiRequestList
904{
905 using Base = MpiRequestList;
906 public:
907 explicit RequestList(MpiParallelMng* pm)
908 : Base(pm->m_adapter), m_parallel_mng(pm){}
909 public:
910 void _wait(Parallel::eWaitType wait_type) override
911 {
912 Base::_wait(wait_type);
913 m_parallel_mng->_checkFinishedSubRequests();
914 };
915 private:
916 MpiParallelMng* m_parallel_mng;
917};
918
919/*---------------------------------------------------------------------------*/
920/*---------------------------------------------------------------------------*/
921
927
928/*---------------------------------------------------------------------------*/
929/*---------------------------------------------------------------------------*/
930
933{
934 return m_utils_factory;
935}
936
937/*---------------------------------------------------------------------------*/
938/*---------------------------------------------------------------------------*/
939
940bool MpiParallelMng::
941_isAcceleratorAware() const
942{
944}
945
946/*---------------------------------------------------------------------------*/
947/*---------------------------------------------------------------------------*/
948
949} // End namespace Arcane
950
951/*---------------------------------------------------------------------------*/
952/*---------------------------------------------------------------------------*/
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Liste des fonctions d'échange de message.
Vue modifiable d'un tableau d'un type T.
constexpr Integer size() const noexcept
Retourne la taille du tableau.
const T * data() const
Accès à la racine du tableau hors toute protection.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
Opérations pour accéder aux valeurs de variables d'un autre sous-domaine.
Interface du gestionnaire des entrées sorties.
Definition IIOMng.h:36
Interface d'une famille d'entités.
Definition IItemFamily.h:84
virtual ItemGroup allItems() const =0
Groupe de toutes les entités.
Échange d'informations entre processeurs.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual ITraceMng * traceMng() const =0
Gestionnaire de traces.
virtual void build()=0
Construit l'instance.
Informations sur la réplication des sous-domaines en parallèle.
Informations sur la topologie d'allocation des coeurs de calcul.
Interface du gestionnaire de traces.
virtual void flush()=0
Flush tous les flots.
virtual TraceMessage info()=0
Flot pour un message d'information.
Envoie de valeurs sur différents processeurs.
Interface d'un communicateur MPI spécifique pour les synchronisations.
Interface d'un service de synchronisation de variable.
Groupe d'entités de maillage.
Definition ItemGroup.h:49
eItemKind itemKind() const
Genre du groupe. Il s'agit du genre de ses éléments.
Definition ItemGroup.h:109
bool isAllItems() const
Indique si le groupe est celui de toutes les entités.
Definition ItemGroup.cc:610
Interface d'un message de sérialisation entre IMessagePassingMng.
Informations sur la source d'un message.
void _wait(eWaitType wait_type) override
Effectue l'attente ou le test.
Request receiveSerializer(ISerializer *s, const PointToPointMessageInfo &message) override
Message de réception.
Request sendSerializer(const ISerializer *s, const PointToPointMessageInfo &message) override
Message d'envoi.
Informations pour envoyer/recevoir un message point à point.
Requête d'un message.
Definition Request.h:77
Message de sérialisation utilisant un BasicSerializer.
static MessageTag defaultTag()
Tag par défaut pour les messages de sérialisation.
Gère les MPI_Datatype associées aux types Arcane.
Ref< IVariableSynchronizer > createSynchronizer(IParallelMng *pm, IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
Ref< IVariableSynchronizer > createSynchronizer(IParallelMng *pm, const ItemGroup &group) override
Retourne une interface pour synchroniser des variables sur le groupe group.
Ref< IMachineMemoryWindowBaseInternal > createMachineMemoryWindowBase(Int64 sizeof_segment, Int32 sizeof_type) override
Méthode permettant de créer une fenêtre mémoire sur le noeud.
Spécialisation de MpiRequestList pour MpiParallelMng.
void _wait(Parallel::eWaitType wait_type) override
Effectue l'attente ou le test.
Gestionnaire du parallélisme utilisant MPI.
IParallelMng * worldParallelMng() const override
Gestionnaire de parallélisme sur l'ensemble des ressources allouées.
MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
void barrier() override
Effectue une barière.
UniqueArray< Integer > waitSomeRequests(ArrayView< Request > requests) override
Bloque en attendant qu'une des requêtes rvalues soit terminée.
void waitAllRequests(ArrayView< Request > requests) override
Bloque en attendant que les requêtes rvalues soient terminées.
MessageId probe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
void build() override
Construit l'instance.
void printStats() override
Affiche des statistiques liées à ce gestionnaire du parallélisme.
bool m_is_initialized
true si déjà initialisé
IParallelMng * sequentialParallelMng() override
Retourne un gestionnaire de parallélisme séquentiel.
IThreadMng * threadMng() const override
Gestionnaire de threads.
ITimerMng * timerMng() const override
Gestionnaire de timers.
void initialize() override
Initialise le gestionnaire du parallélisme.
IVariableSynchronizer * createSynchronizer(IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
ISerializeMessage * createSendSerializer(Int32 rank) override
Créé un message non bloquant pour envoyer des données sérialisées au rang rank.
bool isParallel() const override
Retourne true si l'exécution est parallèle.
Ref< IParallelMngUtilsFactory > _internalUtilsFactory() const override
Fabrique des fonctions utilitaires.
ITraceMng * traceMng() const override
Gestionnaire de traces.
IParallelTopology * createTopology() override
Créé une instance contenant les infos sur la topologie des rangs de ce gestionnnaire.
Communicator communicator() const override
Communicateur MPI associé à ce gestionnaire.
IParallelExchanger * createExchanger() override
Retourne une interface pour transférer des messages entre processeurs.
IParallelReplication * replication() const override
Informations sur la réplication.
void setReplication(IParallelReplication *v) override
Positionne les Informations sur la réplication.
Ref< Parallel::IRequestList > createRequestListRef() override
Créé une liste de requêtes pour ce gestionnaire.
ITransferValuesParallelOperation * createTransferValuesOperation() override
Retourne une opération pour transférer des valeurs entre sous-domaine.
UniqueArray< Integer > testSomeRequests(ArrayView< Request > requests) override
Test si une des requêtes rvalues est terminée.
IGetVariablesValuesParallelOperation * createGetVariablesValuesOperation() override
Retourne une opération pour récupérer les valeurs d'une variable sur les entités d'un autre sous-doma...
ISerializeMessage * createReceiveSerializer(Int32 rank) override
Créé un message non bloquant pour recevoir des données sérialisées du rang rank.
void freeRequests(ArrayView< Parallel::Request > requests) override
Libère les requêtes.
Gestionnaire du parallélisme utilisant MPI.
Gestionnaire de timer utisant la bibliothèque MPI.
Definition MpiTimerMng.h:39
void compute() override
Recalcule les infos de synchronisation.
Redirige la gestion des messages des sous-domaines suivant le type de l'argument.
IMessagePassingMng * messagePassingMng() const override
Gestionnaire de message de Arccore associé
ITimeMetricCollector * timeMetricCollector() const override
Collecteur Arccore des statistiques temporelles (peut être nul)
ITimeStats * timeStats() const override
Gestionnaire de statistiques associé (peut être nul)
Classe de base d'une fabrique pour les fonctions utilitaires de IParallelMng.
Informations sur la réplication des sous-domaines en parallèle.
Référence à une instance.
InstanceType * get() const
Instance associée ou nullptr si aucune.
Chaîne de caractères unicode.
bool null() const
Retourne true si la chaîne est nulle.
Definition String.cc:305
Positionne la phase de l'action en cours d'exécution.
Definition Timer.h:128
Positionne une classe de message.
Vecteur 1D de données avec sémantique par valeur (style STL).
MPI_Comm communicator() const override
Récupère le communicateur spécifique de la topologie.
void compute(VariableSynchronizer *var_syncer) override
Calcul le communicateur spécifique.
Interface d'un service de synchronisation de variable.
void compute() override
Création de la liste des éléments de synchronisation.
Int32ConstArrayView communicatingRanks() override
Rangs des sous-domaines avec lesquels on communique.
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
ARCCORE_BASE_EXPORT String getEnvironmentVariable(const String &name)
Variable d'environnement du nom name.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
Ref< TrueType > createRef(Args &&... args)
Créé une instance de type TrueType avec les arguments Args et retourne une référence dessus.
ARCANE_MPI_EXPORT bool arcaneIsAcceleratorAwareMPI()
Indique si le runtime actuel de MPI a le support des accélérateurs.
Definition ArcaneMpi.cc:82
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
Array< Byte > ByteArray
Tableau dynamique à une dimension de caractères.
Definition UtilsTypes.h:208
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:569
@ IK_Cell
Entité de maillage de genre maille.
void arcaneCallFunctionAndTerminateIfThrow(std::function< void()> function)
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int32_t Int32
Type entier signé sur 32 bits.
Infos pour construire un MpiParallelMng.
Infos pour construire un SequentialParallelMng.