Arcane  v4.1.2.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
MpiParallelMng.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* MpiParallelMng.cc (C) 2000-2025 */
9/* */
10/* Gestionnaire de parallélisme utilisant MPI. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/utils/Collection.h"
15#include "arcane/utils/Enumerator.h"
16#include "arcane/utils/ScopedPtr.h"
17#include "arcane/utils/PlatformUtils.h"
18#include "arcane/utils/TimeoutException.h"
19#include "arcane/utils/NotImplementedException.h"
20#include "arcane/utils/ArgumentException.h"
21#include "arcane/utils/ITraceMng.h"
22#include "arcane/utils/ValueConvert.h"
23#include "arcane/utils/Exception.h"
24#include "arcane/utils/HPReal.h"
25
26#include "arcane/core/IIOMng.h"
27#include "arcane/core/Timer.h"
28#include "arcane/core/IItemFamily.h"
29#include "arcane/core/parallel/IStat.h"
30#include "arcane/core/internal/SerializeMessage.h"
31#include "arcane/core/internal/ParallelMngInternal.h"
32
33#include "arcane/parallel/mpi/MpiParallelMng.h"
34#include "arcane/parallel/mpi/MpiParallelDispatch.h"
35#include "arcane/parallel/mpi/MpiTimerMng.h"
36#include "arcane/parallel/mpi/MpiSerializeMessage.h"
37#include "arcane/parallel/mpi/MpiParallelNonBlockingCollective.h"
38#include "arcane/parallel/mpi/MpiDatatype.h"
39#include "arcane/parallel/mpi/IVariableSynchronizerMpiCommunicator.h"
40
41#include "arcane/impl/ParallelReplication.h"
42#include "arcane/impl/SequentialParallelMng.h"
43#include "arcane/impl/internal/ParallelMngUtilsFactoryBase.h"
44#include "arcane/impl/internal/VariableSynchronizer.h"
45
46#include "arccore/message_passing_mpi/MpiMessagePassingMng.h"
47#include "arccore/message_passing_mpi/internal/MpiSerializeDispatcher.h"
48#include "arccore/message_passing_mpi/internal/MpiRequestList.h"
49#include "arccore/message_passing_mpi/internal/MpiAdapter.h"
50#include "arccore/message_passing_mpi/internal/MpiLock.h"
51#include "arccore/message_passing_mpi/internal/MpiMachineMemoryWindowBaseInternalCreator.h"
52#include "arccore/message_passing_mpi/internal/MpiMachineMemoryWindowBaseInternal.h"
53#include "arccore/message_passing_mpi/internal/MpiDynamicMachineMemoryWindowBaseInternal.h"
54#include "arccore/message_passing/Dispatchers.h"
56#include "arccore/message_passing/internal/SerializeMessageList.h"
57
58#include "arcane_packages.h"
59
60//#define ARCANE_TRACE_MPI
61
62/*---------------------------------------------------------------------------*/
63/*---------------------------------------------------------------------------*/
64
65namespace Arcane
66{
67using namespace Arcane::MessagePassing;
68using namespace Arcane::MessagePassing::Mpi;
70
71/*---------------------------------------------------------------------------*/
72/*---------------------------------------------------------------------------*/
73
74extern "C++" IIOMng*
75arcaneCreateIOMng(IParallelMng* psm);
76
77#if defined(ARCANE_HAS_MPI_NEIGHBOR)
78// Défini dans MpiNeighborVariableSynchronizeDispatcher
80arcaneCreateMpiNeighborVariableSynchronizerFactory(MpiParallelMng* mpi_pm,
81 Ref<IVariableSynchronizerMpiCommunicator> synchronizer_communicator);
82#endif
84arcaneCreateMpiBlockVariableSynchronizerFactory(MpiParallelMng* mpi_pm, Int32 block_size, Int32 nb_sequence);
86arcaneCreateMpiVariableSynchronizerFactory(MpiParallelMng* mpi_pm);
88arcaneCreateMpiDirectSendrecvVariableSynchronizerFactory(MpiParallelMng* mpi_pm);
90arcaneCreateMpiLegacyVariableSynchronizerFactory(MpiParallelMng* mpi_pm);
91#if defined(ARCANE_HAS_PACKAGE_NCCL)
93arcaneCreateNCCLVariableSynchronizerFactory(IParallelMng* mpi_pm);
94#endif
95
96/*---------------------------------------------------------------------------*/
97/*---------------------------------------------------------------------------*/
98
99MpiParallelMngBuildInfo::
100MpiParallelMngBuildInfo(MPI_Comm comm, MPI_Comm machine_comm)
101: is_parallel(false)
102, comm_rank(MessagePassing::A_NULL_RANK)
103, comm_nb_rank(0)
104, stat(nullptr)
105, trace_mng(nullptr)
106, timer_mng(nullptr)
107, thread_mng(nullptr)
108, mpi_comm(comm)
109, mpi_machine_comm(machine_comm)
110, is_mpi_comm_owned(true)
111, mpi_lock(nullptr)
112{
113 ::MPI_Comm_rank(comm,&comm_rank);
114 ::MPI_Comm_size(comm,&comm_nb_rank);
115
116 m_dispatchers_ref = createRef<MP::Dispatchers>();
117 MP::Mpi::MpiMessagePassingMng::BuildInfo bi(comm_rank,comm_nb_rank,m_dispatchers_ref.get(),mpi_comm);
118
119 m_message_passing_mng_ref = createRef<MP::Mpi::MpiMessagePassingMng>(bi);
120}
121
122/*---------------------------------------------------------------------------*/
123/*---------------------------------------------------------------------------*/
124
125/*---------------------------------------------------------------------------*/
126/*---------------------------------------------------------------------------*/
130class VariableSynchronizerMpiCommunicator
132{
133 public:
134 explicit VariableSynchronizerMpiCommunicator(MpiParallelMng* pm)
135 : m_mpi_parallel_mng(pm){}
136 ~VariableSynchronizerMpiCommunicator() override
137 {
138 _checkFreeCommunicator();
139 }
140 MPI_Comm communicator() const override
141 {
142 return m_topology_communicator;
143 }
144 void compute(VariableSynchronizer* var_syncer) override
145 {
146 Int32ConstArrayView comm_ranks = var_syncer->communicatingRanks();
147 const Int32 nb_message = comm_ranks.size();
148
149 MpiParallelMng* pm = m_mpi_parallel_mng;
150
151 MPI_Comm old_comm = pm->communicator();
152
153 UniqueArray<int> destinations(nb_message);
154 for( Integer i=0; i<nb_message; ++i ){
155 destinations[i] = comm_ranks[i];
156 }
157
158 _checkFreeCommunicator();
159
160 int r = MPI_Dist_graph_create_adjacent(old_comm, nb_message, destinations.data(), MPI_UNWEIGHTED,
161 nb_message, destinations.data(), MPI_UNWEIGHTED,
162 MPI_INFO_NULL, 0, &m_topology_communicator);
163
164 if (r!=MPI_SUCCESS)
165 ARCANE_FATAL("Error '{0}' in MPI_Dist_graph_create",r);
166
167 // Vérifie que l'ordre des rangs pour l'implémentation MPI est le même que celui qu'on a dans
168 // le VariableSynchronizer.
169 {
170 int indegree = 0;
171 int outdegree = 0;
172 int weighted = 0;
173 MPI_Dist_graph_neighbors_count(m_topology_communicator,&indegree,&outdegree,&weighted);
174
175 if (indegree!=nb_message)
176 ARCANE_FATAL("Bad value '{0}' for 'indegree' (expected={1})",indegree,nb_message);
177 if (outdegree!=nb_message)
178 ARCANE_FATAL("Bad value '{0}' for 'outdegree' (expected={1})",outdegree,nb_message);
179
180 UniqueArray<int> srcs(indegree);
181 UniqueArray<int> dsts(outdegree);
182
183 MPI_Dist_graph_neighbors(m_topology_communicator,indegree,srcs.data(),MPI_UNWEIGHTED,outdegree,dsts.data(),MPI_UNWEIGHTED);
184
185 for(int k=0; k<outdegree; ++k){
186 int x = dsts[k];
187 if (x!=comm_ranks[k])
188 ARCANE_FATAL("Invalid destination rank order k={0} v={1} expected={2}",k,x,comm_ranks[k]);
189 }
190
191 for(int k=0; k<indegree; ++k ){
192 int x = srcs[k];
193 if (x!=comm_ranks[k])
194 ARCANE_FATAL("Invalid source rank order k={0} v={1} expected={2}",k,x,comm_ranks[k]);
195 }
196 }
197 }
198
199 private:
200
201 MpiParallelMng* m_mpi_parallel_mng = nullptr;
202 MPI_Comm m_topology_communicator = MPI_COMM_NULL;
203
204 private:
205
206 void _checkFreeCommunicator()
207 {
208 if (m_topology_communicator!=MPI_COMM_NULL)
209 MPI_Comm_free(&m_topology_communicator);
210 m_topology_communicator = MPI_COMM_NULL;
211 }
212};
213
214/*---------------------------------------------------------------------------*/
215/*---------------------------------------------------------------------------*/
222class MpiVariableSynchronizer
223: public VariableSynchronizer
224{
225 public:
226 MpiVariableSynchronizer(IParallelMng* pm,const ItemGroup& group,
227 Ref<IDataSynchronizeImplementationFactory> implementation_factory,
229 : VariableSynchronizer(pm,group,implementation_factory)
230 , m_topology_info(topology_info)
231 {
232 }
233 public:
234 void compute() override
235 {
237 // Si non nul, calcule la topologie
238 if (m_topology_info.get())
239 m_topology_info->compute(this);
240 }
241 private:
243};
244
245/*---------------------------------------------------------------------------*/
246/*---------------------------------------------------------------------------*/
247
248class MpiParallelMngUtilsFactory
250{
251 public:
252 MpiParallelMngUtilsFactory()
253 : m_synchronizer_version(2)
254 {
255 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="1")
256 m_synchronizer_version = 1;
257 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="2")
258 m_synchronizer_version = 2;
259 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="3")
260 m_synchronizer_version = 3;
261 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="4"){
262 m_synchronizer_version = 4;
263 String v = platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_BLOCK_SIZE");
264 if (!v.null()){
265 Int32 block_size = 0;
266 if (!builtInGetValue(block_size,v))
267 m_synchronize_block_size = block_size;
268 m_synchronize_block_size = std::clamp(m_synchronize_block_size,0,1000000000);
269 }
270 v = platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_NB_SEQUENCE");
271 if (!v.null()){
272 Int32 nb_sequence = 0;
273 if (!builtInGetValue(nb_sequence,v))
274 m_synchronize_nb_sequence = nb_sequence;
275 m_synchronize_nb_sequence = std::clamp(m_synchronize_nb_sequence,1,1024*1024);
276 }
277 }
278 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="5")
279 m_synchronizer_version = 5;
280 if (platform::getEnvironmentVariable("ARCANE_SYNCHRONIZE_VERSION")=="6")
281 m_synchronizer_version = 6;
282 }
283 public:
284
286 {
287 return _createSynchronizer(pm,family->allItems());
288 }
289
291 {
292 return _createSynchronizer(pm,group);
293 }
294
295 private:
296
297 Ref<IVariableSynchronizer> _createSynchronizer(IParallelMng* pm,const ItemGroup& group)
298 {
300 MpiParallelMng* mpi_pm = ARCANE_CHECK_POINTER(dynamic_cast<MpiParallelMng*>(pm));
301 ITraceMng* tm = pm->traceMng();
303 // N'affiche les informations que pour le groupe de toutes les mailles pour éviter d'afficher
304 // plusieurs fois le même message.
305 bool do_print = (group.isAllItems() && group.itemKind()==IK_Cell);
306 if (m_synchronizer_version == 2){
307 if (do_print)
308 tm->info() << "Using MpiSynchronizer V2";
309 generic_factory = arcaneCreateMpiVariableSynchronizerFactory(mpi_pm);
310 }
311 else if (m_synchronizer_version == 3 ){
312 if (do_print)
313 tm->info() << "Using MpiSynchronizer V3";
314 generic_factory = arcaneCreateMpiDirectSendrecvVariableSynchronizerFactory(mpi_pm);
315 }
316 else if (m_synchronizer_version == 4){
317 if (do_print)
318 tm->info() << "Using MpiSynchronizer V4 block_size=" << m_synchronize_block_size
319 << " nb_sequence=" << m_synchronize_nb_sequence;
320 generic_factory = arcaneCreateMpiBlockVariableSynchronizerFactory(mpi_pm,m_synchronize_block_size,m_synchronize_nb_sequence);
321 }
322 else if (m_synchronizer_version == 5){
323 if (do_print)
324 tm->info() << "Using MpiSynchronizer V5";
325 topology_info = createRef<VariableSynchronizerMpiCommunicator>(mpi_pm);
326#if defined(ARCANE_HAS_MPI_NEIGHBOR)
327 generic_factory = arcaneCreateMpiNeighborVariableSynchronizerFactory(mpi_pm,topology_info);
328#else
329 throw NotSupportedException(A_FUNCINFO,"Synchronize implementation V5 is not supported with this version of MPI");
330#endif
331 }
332#if defined(ARCANE_HAS_PACKAGE_NCCL)
333 else if (m_synchronizer_version == 6){
334 if (do_print)
335 tm->info() << "Using NCCLSynchronizer";
336 generic_factory = arcaneCreateNCCLVariableSynchronizerFactory(mpi_pm);
337 }
338#endif
339 else{
340 if (do_print)
341 tm->info() << "Using MpiSynchronizer V1";
342 generic_factory = arcaneCreateMpiLegacyVariableSynchronizerFactory(mpi_pm);
343 }
344 if (!generic_factory.get())
345 ARCANE_FATAL("No factory created");
346 return createRef<MpiVariableSynchronizer>(pm,group,generic_factory,topology_info);
347 }
348
349 private:
350
351 Integer m_synchronizer_version = 1;
352 Int32 m_synchronize_block_size = 32000;
353 Int32 m_synchronize_nb_sequence = 1;
354};
355
356/*---------------------------------------------------------------------------*/
357/*---------------------------------------------------------------------------*/
358
359/*---------------------------------------------------------------------------*/
360/*---------------------------------------------------------------------------*/
361
363: public ParallelMngInternal
364{
365 public:
366
367 explicit Impl(MpiParallelMng* pm)
368 : ParallelMngInternal(pm)
369 , m_parallel_mng(pm)
370 {}
371
372 ~Impl() override = default;
373
374 public:
375
377 {
378 return makeRef(m_parallel_mng->adapter()->windowCreator(m_parallel_mng->machineCommunicator())->createWindow(sizeof_segment, sizeof_type));
379 }
380
382 {
383 return makeRef(m_parallel_mng->adapter()->windowCreator(m_parallel_mng->machineCommunicator())->createDynamicWindow(sizeof_segment, sizeof_type));
384 }
385
386 private:
387
388 MpiParallelMng* m_parallel_mng;
389};
390
391/*---------------------------------------------------------------------------*/
392/*---------------------------------------------------------------------------*/
393
394/*---------------------------------------------------------------------------*/
395/*---------------------------------------------------------------------------*/
396
397MpiParallelMng::
398MpiParallelMng(const MpiParallelMngBuildInfo& bi)
399: ParallelMngDispatcher(ParallelMngDispatcherBuildInfo(bi.dispatchersRef(),bi.messagePassingMngRef()))
400, m_trace(bi.trace_mng)
401, m_thread_mng(bi.thread_mng)
402, m_world_parallel_mng(bi.world_parallel_mng)
403, m_timer_mng(bi.timer_mng)
404, m_replication(new ParallelReplication())
405, m_is_parallel(bi.is_parallel)
406, m_comm_rank(bi.commRank())
407, m_comm_size(bi.commSize())
408, m_stat(bi.stat)
409, m_communicator(bi.mpiComm())
410, m_machine_communicator(bi.mpiMachineComm())
411, m_is_communicator_owned(bi.is_mpi_comm_owned)
412, m_mpi_lock(bi.mpi_lock)
413, m_non_blocking_collective(nullptr)
414, m_utils_factory(createRef<MpiParallelMngUtilsFactory>())
415, m_parallel_mng_internal(new Impl(this))
416{
417 if (!m_world_parallel_mng){
418 m_trace->debug()<<"[MpiParallelMng] No m_world_parallel_mng found, reverting to ourselves!";
419 m_world_parallel_mng = this;
420 }
421}
422
423/*---------------------------------------------------------------------------*/
424/*---------------------------------------------------------------------------*/
425
426MpiParallelMng::
427~MpiParallelMng()
428{
429 delete m_parallel_mng_internal;
430 delete m_non_blocking_collective;
431 m_sequential_parallel_mng.reset();
432 if (m_is_communicator_owned){
433 MpiLock::Section ls(m_mpi_lock);
434 MPI_Comm_free(&m_communicator);
435 MPI_Comm_free(&m_machine_communicator);
436 }
437 delete m_replication;
438 delete m_io_mng;
439 if (m_is_timer_owned)
440 delete m_timer_mng;
441 arcaneCallFunctionAndTerminateIfThrow([&]() { m_adapter->destroy(); });
442 delete m_datatype_list;
443}
444
445/*---------------------------------------------------------------------------*/
446/*---------------------------------------------------------------------------*/
447
448namespace
449{
450
451/*---------------------------------------------------------------------------*/
452/*---------------------------------------------------------------------------*/
453// Classe pour créer les différents dispatchers
454class DispatchCreator
455{
456 public:
457 DispatchCreator(ITraceMng* tm,IMessagePassingMng* mpm,MpiAdapter* adapter,MpiDatatypeList* datatype_list)
458 : m_tm(tm), m_mpm(mpm), m_adapter(adapter), m_datatype_list(datatype_list){}
459 public:
460 template<typename DataType> MpiParallelDispatchT<DataType>*
461 create()
462 {
463 MpiDatatype* dt = m_datatype_list->datatype(DataType());
464 return new MpiParallelDispatchT<DataType>(m_tm,m_mpm,m_adapter,dt);
465 }
466
467 ITraceMng* m_tm;
468 IMessagePassingMng* m_mpm;
469 MpiAdapter* m_adapter;
470 MpiDatatypeList* m_datatype_list;
471};
472
473/*---------------------------------------------------------------------------*/
474/*---------------------------------------------------------------------------*/
475
476class ControlDispatcherDecorator
477: public ParallelMngDispatcher::DefaultControlDispatcher
478{
479 public:
480
481 ControlDispatcherDecorator(IParallelMng* pm, MpiAdapter* adapter)
482 : ParallelMngDispatcher::DefaultControlDispatcher(pm), m_adapter(adapter) {}
483
484 IMessagePassingMng* commSplit(bool keep) override
485 {
486 return m_adapter->commSplit(keep);
487 }
488 MP::IProfiler* profiler() const override { return m_adapter->profiler(); }
489 void setProfiler(MP::IProfiler* p) override { m_adapter->setProfiler(p); }
490
491 private:
492 MpiAdapter* m_adapter;
493};
494}
495
496/*---------------------------------------------------------------------------*/
497/*---------------------------------------------------------------------------*/
498
500build()
501{
502 ITraceMng* tm = traceMng();
503 if (!m_timer_mng){
504 m_timer_mng = new MpiTimerMng(tm);
505 m_is_timer_owned = true;
506 }
507
508 // Créé le gestionnaire séquentiel associé.
509 {
511 bi.setTraceMng(traceMng());
512 bi.setCommunicator(communicator());
513 bi.setThreadMng(threadMng());
514 m_sequential_parallel_mng = arcaneCreateSequentialParallelMngRef(bi);
515 }
516
517 // Indique que les reduces doivent être fait dans l'ordre des processeurs
518 // afin de garantir une exécution déterministe
519 bool is_ordered_reduce = false;
520 if (platform::getEnvironmentVariable("ARCANE_ORDERED_REDUCE")=="TRUE")
521 is_ordered_reduce = true;
522 m_datatype_list = new MpiDatatypeList(is_ordered_reduce);
523
524 ARCANE_CHECK_POINTER(m_stat);
525
526 MpiAdapter* adapter = new MpiAdapter(m_trace,m_stat->toArccoreStat(),m_communicator,m_mpi_lock);
527 m_adapter = adapter;
528 auto mpm = _messagePassingMng();
529
530 // NOTE: cette instance sera détruite par le ParallelMngDispatcher
531 auto* control_dispatcher = new ControlDispatcherDecorator(this,m_adapter);
532 _setControlDispatcher(control_dispatcher);
533
534 // NOTE: cette instance sera détruite par le ParallelMngDispatcher
535 auto* serialize_dispatcher = new MpiSerializeDispatcher(m_adapter, mpm);
536 m_mpi_serialize_dispatcher = serialize_dispatcher;
537 _setSerializeDispatcher(serialize_dispatcher);
538
539 DispatchCreator creator(m_trace,mpm,m_adapter,m_datatype_list);
540 this->createDispatchers(creator);
541
542 m_io_mng = arcaneCreateIOMng(this);
543
544 m_non_blocking_collective = new MpiParallelNonBlockingCollective(tm,this,adapter);
545 m_non_blocking_collective->build();
546 if (m_mpi_lock)
547 m_trace->info() << "Using mpi with locks.";
548}
549
550/*---------------------------------------------------------------------------*/
551/*---------------------------------------------------------------------------*/
552
553/*----------------------------------------------------------------------------*/
554/*---------------------------------------------------------------------------*/
555
558{
559 Trace::Setter mci(m_trace,"Mpi");
560 if (m_is_initialized){
561 m_trace->warning() << "MpiParallelMng already initialized";
562 return;
563 }
564
565 m_trace->info() << "Initialisation de MpiParallelMng";
566 m_sequential_parallel_mng->initialize();
567
568 m_adapter->setTimeMetricCollector(timeMetricCollector());
569
570 m_is_initialized = true;
571}
572
573/*---------------------------------------------------------------------------*/
574/*---------------------------------------------------------------------------*/
575
576void MpiParallelMng::
577sendSerializer(ISerializer* s,Int32 rank)
578{
579 Trace::Setter mci(m_trace,"Mpi");
580 Timer::Phase tphase(timeStats(),TP_Communication);
582 m_mpi_serialize_dispatcher->legacySendSerializer(s,{MessageRank(rank),mpi_tag,Blocking});
583}
584
585/*---------------------------------------------------------------------------*/
586/*---------------------------------------------------------------------------*/
587
590{
591 return m_utils_factory->createSendSerializeMessage(this, rank)._release();
592}
593
594/*---------------------------------------------------------------------------*/
595/*---------------------------------------------------------------------------*/
596
597Request MpiParallelMng::
598sendSerializer(ISerializer* s,Int32 rank,[[maybe_unused]] ByteArray& bytes)
599{
600 Trace::Setter mci(m_trace,"Mpi");
601 Timer::Phase tphase(timeStats(),TP_Communication);
603 return m_mpi_serialize_dispatcher->legacySendSerializer(s,{MessageRank(rank),mpi_tag,NonBlocking});
604}
605
606/*---------------------------------------------------------------------------*/
607/*---------------------------------------------------------------------------*/
608
609void MpiParallelMng::
610broadcastSerializer(ISerializer* values,Int32 rank)
611{
612 Timer::Phase tphase(timeStats(),TP_Communication);
613 m_mpi_serialize_dispatcher->broadcastSerializer(values,MessageRank(rank));
614}
615
616/*---------------------------------------------------------------------------*/
617/*---------------------------------------------------------------------------*/
618
619void MpiParallelMng::
620recvSerializer(ISerializer* values,Int32 rank)
621{
622 Trace::Setter mci(m_trace,"Mpi");
623 Timer::Phase tphase(timeStats(),TP_Communication);
625 m_mpi_serialize_dispatcher->legacyReceiveSerializer(values,MessageRank(rank),mpi_tag);
626}
627
628/*---------------------------------------------------------------------------*/
629/*---------------------------------------------------------------------------*/
630
633{
634 return m_utils_factory->createReceiveSerializeMessage(this, rank)._release();
635}
636
637/*---------------------------------------------------------------------------*/
638/*---------------------------------------------------------------------------*/
639
641probe(const PointToPointMessageInfo& message) -> MessageId
642{
643 return m_adapter->probeMessage(message);
644}
645
646/*---------------------------------------------------------------------------*/
647/*---------------------------------------------------------------------------*/
648
651{
652 return m_adapter->legacyProbeMessage(message);
653}
654
655/*---------------------------------------------------------------------------*/
656/*---------------------------------------------------------------------------*/
657
658Request MpiParallelMng::
659sendSerializer(const ISerializer* s,const PointToPointMessageInfo& message)
660{
661 return m_mpi_serialize_dispatcher->sendSerializer(s,message);
662}
663
664/*---------------------------------------------------------------------------*/
665/*---------------------------------------------------------------------------*/
666
667Request MpiParallelMng::
668receiveSerializer(ISerializer* s,const PointToPointMessageInfo& message)
669{
670 return m_mpi_serialize_dispatcher->receiveSerializer(s,message);
671}
672
673/*---------------------------------------------------------------------------*/
674/*---------------------------------------------------------------------------*/
675
678{
679 for( Integer i=0, is=requests.size(); i<is; ++i )
680 m_adapter->freeRequest(requests[i]);
681}
682
683/*---------------------------------------------------------------------------*/
684/*---------------------------------------------------------------------------*/
685
686void MpiParallelMng::
687_checkFinishedSubRequests()
688{
689 m_mpi_serialize_dispatcher->checkFinishedSubRequests();
690}
691
692/*---------------------------------------------------------------------------*/
693/*---------------------------------------------------------------------------*/
694
695Ref<IParallelMng> MpiParallelMng::
696sequentialParallelMngRef()
697{
698 return m_sequential_parallel_mng;
699}
700
703{
704 return m_sequential_parallel_mng.get();
705}
706
707/*---------------------------------------------------------------------------*/
708/*---------------------------------------------------------------------------*/
709
712{
713 if (m_stat)
714 m_stat->print(m_trace);
715}
716
717/*---------------------------------------------------------------------------*/
718/*---------------------------------------------------------------------------*/
719
721barrier()
722{
723 traceMng()->flush();
724 m_adapter->barrier();
725}
726
727/*---------------------------------------------------------------------------*/
728/*---------------------------------------------------------------------------*/
729
732{
733 m_adapter->waitAllRequests(requests);
734 _checkFinishedSubRequests();
735}
736
737/*---------------------------------------------------------------------------*/
738/*---------------------------------------------------------------------------*/
739
742{
743 return _waitSomeRequests(requests, false);
744}
745
746/*---------------------------------------------------------------------------*/
747/*---------------------------------------------------------------------------*/
748
751{
752 return _waitSomeRequests(requests, true);
753}
754
755/*---------------------------------------------------------------------------*/
756/*---------------------------------------------------------------------------*/
757
758UniqueArray<Integer> MpiParallelMng::
759_waitSomeRequests(ArrayView<Request> requests, bool is_non_blocking)
760{
761 UniqueArray<Integer> results;
762 UniqueArray<bool> done_indexes(requests.size());
763
764 m_adapter->waitSomeRequests(requests, done_indexes, is_non_blocking);
765 for (int i = 0 ; i < requests.size() ; i++) {
766 if (done_indexes[i])
767 results.add(i);
768 }
769 return results;
770}
771
772/*---------------------------------------------------------------------------*/
773/*---------------------------------------------------------------------------*/
774
775ISerializeMessageList* MpiParallelMng::
776_createSerializeMessageList()
777{
778 return new MP::internal::SerializeMessageList(messagePassingMng());
779}
780
781/*---------------------------------------------------------------------------*/
782/*---------------------------------------------------------------------------*/
783
786{
787 return m_utils_factory->createGetVariablesValuesOperation(this)._release();
788}
789
790/*---------------------------------------------------------------------------*/
791/*---------------------------------------------------------------------------*/
792
795{
796 return m_utils_factory->createTransferValuesOperation(this)._release();
797}
798
799/*---------------------------------------------------------------------------*/
800/*---------------------------------------------------------------------------*/
801
804{
805 return m_utils_factory->createExchanger(this)._release();
806}
807
808/*---------------------------------------------------------------------------*/
809/*---------------------------------------------------------------------------*/
810
813{
814 return m_utils_factory->createSynchronizer(this,family)._release();
815}
816
817/*---------------------------------------------------------------------------*/
818/*---------------------------------------------------------------------------*/
819
821createSynchronizer(const ItemGroup& group)
822{
823 return m_utils_factory->createSynchronizer(this,group)._release();
824}
825
826/*---------------------------------------------------------------------------*/
827/*---------------------------------------------------------------------------*/
828
831{
832 return m_utils_factory->createTopology(this)._release();
833}
834
835/*---------------------------------------------------------------------------*/
836/*---------------------------------------------------------------------------*/
837
839replication() const
840{
841 return m_replication;
842}
843
844/*---------------------------------------------------------------------------*/
845/*---------------------------------------------------------------------------*/
846
849{
850 delete m_replication;
851 m_replication = v;
852}
853
854/*---------------------------------------------------------------------------*/
855/*---------------------------------------------------------------------------*/
856
857IParallelMng* MpiParallelMng::
858_createSubParallelMng(MPI_Comm sub_communicator)
859{
860 // Si nul, ce rang ne fait pas partie du sous-communicateur
861 if (sub_communicator==MPI_COMM_NULL)
862 return nullptr;
863
864 int sub_rank = -1;
865 MPI_Comm_rank(sub_communicator,&sub_rank);
866
867 MPI_Comm sub_machine_communicator = MPI_COMM_NULL;
868 MPI_Comm_split_type(sub_communicator, MPI_COMM_TYPE_SHARED, sub_rank, MPI_INFO_NULL, &sub_machine_communicator);
869
870 MpiParallelMngBuildInfo bi(sub_communicator, sub_machine_communicator);
871 bi.is_parallel = isParallel();
872 bi.stat = m_stat;
873 bi.timer_mng = m_timer_mng;
874 bi.thread_mng = m_thread_mng;
875 bi.trace_mng = m_trace;
876 bi.world_parallel_mng = m_world_parallel_mng;
877 bi.mpi_lock = m_mpi_lock;
878
879 IParallelMng* sub_pm = new MpiParallelMng(bi);
880 sub_pm->build();
881 return sub_pm;
882}
883
884/*---------------------------------------------------------------------------*/
885/*---------------------------------------------------------------------------*/
886
887Ref<IParallelMng> MpiParallelMng::
888_createSubParallelMngRef(Int32 color, Int32 key)
889{
890 if (color < 0)
891 color = MPI_UNDEFINED;
892 MPI_Comm sub_communicator = MPI_COMM_NULL;
893 MPI_Comm_split(m_communicator, color, key, &sub_communicator);
894 IParallelMng* sub_pm = _createSubParallelMng(sub_communicator);
895 return makeRef(sub_pm);
896}
897
898/*---------------------------------------------------------------------------*/
899/*---------------------------------------------------------------------------*/
900
901IParallelMng* MpiParallelMng::
902_createSubParallelMng(Int32ConstArrayView kept_ranks)
903{
904 MPI_Group mpi_group = MPI_GROUP_NULL;
905 MPI_Comm_group(m_communicator, &mpi_group);
906 Integer nb_sub_rank = kept_ranks.size();
907 UniqueArray<int> mpi_kept_ranks(nb_sub_rank);
908 for (Integer i = 0; i < nb_sub_rank; ++i)
909 mpi_kept_ranks[i] = (int)kept_ranks[i];
910
911 MPI_Group final_group = MPI_GROUP_NULL;
912 MPI_Group_incl(mpi_group, nb_sub_rank, mpi_kept_ranks.data(), &final_group);
913 MPI_Comm sub_communicator = MPI_COMM_NULL;
914
915 MPI_Comm_create(m_communicator, final_group, &sub_communicator);
916 MPI_Group_free(&final_group);
917 return _createSubParallelMng(sub_communicator);
918}
919
920/*---------------------------------------------------------------------------*/
921/*---------------------------------------------------------------------------*/
930: public MpiRequestList
931{
932 using Base = MpiRequestList;
933 public:
934 explicit RequestList(MpiParallelMng* pm)
935 : Base(pm->m_adapter), m_parallel_mng(pm){}
936 public:
937 void _wait(Parallel::eWaitType wait_type) override
938 {
939 Base::_wait(wait_type);
940 m_parallel_mng->_checkFinishedSubRequests();
941 };
942 private:
943 MpiParallelMng* m_parallel_mng;
944};
945
946/*---------------------------------------------------------------------------*/
947/*---------------------------------------------------------------------------*/
948
954
955/*---------------------------------------------------------------------------*/
956/*---------------------------------------------------------------------------*/
957
960{
961 return m_utils_factory;
962}
963
964/*---------------------------------------------------------------------------*/
965/*---------------------------------------------------------------------------*/
966
967bool MpiParallelMng::
968_isAcceleratorAware() const
969{
971}
972
973/*---------------------------------------------------------------------------*/
974/*---------------------------------------------------------------------------*/
975
976} // End namespace Arcane
977
978/*---------------------------------------------------------------------------*/
979/*---------------------------------------------------------------------------*/
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Liste des fonctions d'échange de message.
Vue modifiable d'un tableau d'un type T.
constexpr Integer size() const noexcept
Retourne la taille du tableau.
const T * data() const
Accès à la racine du tableau hors toute protection.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
Opérations pour accéder aux valeurs de variables d'un autre sous-domaine.
Interface du gestionnaire des entrées sorties.
Definition IIOMng.h:36
Interface d'une famille d'entités.
Definition IItemFamily.h:84
virtual ItemGroup allItems() const =0
Groupe de toutes les entités.
Échange d'informations entre processeurs.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual ITraceMng * traceMng() const =0
Gestionnaire de traces.
virtual void build()=0
Construit l'instance.
Informations sur la réplication des sous-domaines en parallèle.
Informations sur la topologie d'allocation des coeurs de calcul.
Interface du gestionnaire de traces.
virtual void flush()=0
Flush tous les flots.
virtual TraceMessage info()=0
Flot pour un message d'information.
Envoie de valeurs sur différents processeurs.
Interface d'un communicateur MPI spécifique pour les synchronisations.
Interface d'un service de synchronisation de variable.
Groupe d'entités de maillage.
Definition ItemGroup.h:49
eItemKind itemKind() const
Genre du groupe. Il s'agit du genre de ses éléments.
Definition ItemGroup.h:109
bool isAllItems() const
Indique si le groupe est celui de toutes les entités.
Definition ItemGroup.cc:610
Interface d'un message de sérialisation entre IMessagePassingMng.
Informations sur la source d'un message.
void _wait(eWaitType wait_type) override
Effectue l'attente ou le test.
Request receiveSerializer(ISerializer *s, const PointToPointMessageInfo &message) override
Message de réception.
Request sendSerializer(const ISerializer *s, const PointToPointMessageInfo &message) override
Message d'envoi.
Informations pour envoyer/recevoir un message point à point.
Requête d'un message.
Definition Request.h:77
Message de sérialisation utilisant un BasicSerializer.
static MessageTag defaultTag()
Tag par défaut pour les messages de sérialisation.
Gère les MPI_Datatype associées aux types Arcane.
Ref< IVariableSynchronizer > createSynchronizer(IParallelMng *pm, IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
Ref< IVariableSynchronizer > createSynchronizer(IParallelMng *pm, const ItemGroup &group) override
Retourne une interface pour synchroniser des variables sur le groupe group.
Ref< IDynamicMachineMemoryWindowBaseInternal > createDynamicMachineMemoryWindowBase(Int64 sizeof_segment, Int32 sizeof_type) override
Méthode permettant de créer une fenêtre mémoire dynamique sur le noeud.
Ref< IMachineMemoryWindowBaseInternal > createMachineMemoryWindowBase(Int64 sizeof_segment, Int32 sizeof_type) override
Méthode permettant de créer une fenêtre mémoire sur le noeud.
Spécialisation de MpiRequestList pour MpiParallelMng.
void _wait(Parallel::eWaitType wait_type) override
Effectue l'attente ou le test.
Gestionnaire du parallélisme utilisant MPI.
IParallelMng * worldParallelMng() const override
Gestionnaire de parallélisme sur l'ensemble des ressources allouées.
MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
void barrier() override
Effectue une barière.
UniqueArray< Integer > waitSomeRequests(ArrayView< Request > requests) override
Bloque en attendant qu'une des requêtes rvalues soit terminée.
void waitAllRequests(ArrayView< Request > requests) override
Bloque en attendant que les requêtes rvalues soient terminées.
MessageId probe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
void build() override
Construit l'instance.
void printStats() override
Affiche des statistiques liées à ce gestionnaire du parallélisme.
bool m_is_initialized
true si déjà initialisé
IParallelMng * sequentialParallelMng() override
Retourne un gestionnaire de parallélisme séquentiel.
IThreadMng * threadMng() const override
Gestionnaire de threads.
ITimerMng * timerMng() const override
Gestionnaire de timers.
void initialize() override
Initialise le gestionnaire du parallélisme.
IVariableSynchronizer * createSynchronizer(IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
ISerializeMessage * createSendSerializer(Int32 rank) override
Créé un message non bloquant pour envoyer des données sérialisées au rang rank.
bool isParallel() const override
Retourne true si l'exécution est parallèle.
Ref< IParallelMngUtilsFactory > _internalUtilsFactory() const override
Fabrique des fonctions utilitaires.
ITraceMng * traceMng() const override
Gestionnaire de traces.
IParallelTopology * createTopology() override
Créé une instance contenant les infos sur la topologie des rangs de ce gestionnnaire.
Communicator communicator() const override
Communicateur MPI associé à ce gestionnaire.
IParallelExchanger * createExchanger() override
Retourne une interface pour transférer des messages entre processeurs.
IParallelReplication * replication() const override
Informations sur la réplication.
void setReplication(IParallelReplication *v) override
Positionne les Informations sur la réplication.
Ref< Parallel::IRequestList > createRequestListRef() override
Créé une liste de requêtes pour ce gestionnaire.
ITransferValuesParallelOperation * createTransferValuesOperation() override
Retourne une opération pour transférer des valeurs entre sous-domaine.
UniqueArray< Integer > testSomeRequests(ArrayView< Request > requests) override
Test si une des requêtes rvalues est terminée.
IGetVariablesValuesParallelOperation * createGetVariablesValuesOperation() override
Retourne une opération pour récupérer les valeurs d'une variable sur les entités d'un autre sous-doma...
ISerializeMessage * createReceiveSerializer(Int32 rank) override
Créé un message non bloquant pour recevoir des données sérialisées du rang rank.
void freeRequests(ArrayView< Parallel::Request > requests) override
Libère les requêtes.
Gestionnaire du parallélisme utilisant MPI.
Gestionnaire de timer utisant la bibliothèque MPI.
Definition MpiTimerMng.h:39
void compute() override
Recalcule les infos de synchronisation.
Redirige la gestion des messages des sous-domaines suivant le type de l'argument.
IMessagePassingMng * messagePassingMng() const override
Gestionnaire de message de Arccore associé
ITimeMetricCollector * timeMetricCollector() const override
Collecteur Arccore des statistiques temporelles (peut être nul)
ITimeStats * timeStats() const override
Gestionnaire de statistiques associé (peut être nul)
Classe de base d'une fabrique pour les fonctions utilitaires de IParallelMng.
Informations sur la réplication des sous-domaines en parallèle.
Référence à une instance.
InstanceType * get() const
Instance associée ou nullptr si aucune.
Chaîne de caractères unicode.
bool null() const
Retourne true si la chaîne est nulle.
Definition String.cc:305
Positionne la phase de l'action en cours d'exécution.
Definition Timer.h:128
Positionne une classe de message.
Vecteur 1D de données avec sémantique par valeur (style STL).
MPI_Comm communicator() const override
Récupère le communicateur spécifique de la topologie.
void compute(VariableSynchronizer *var_syncer) override
Calcul le communicateur spécifique.
Interface d'un service de synchronisation de variable.
void compute() override
Création de la liste des éléments de synchronisation.
Int32ConstArrayView communicatingRanks() override
Rangs des sous-domaines avec lesquels on communique.
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
ARCCORE_BASE_EXPORT String getEnvironmentVariable(const String &name)
Variable d'environnement du nom name.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
Ref< TrueType > createRef(Args &&... args)
Créé une instance de type TrueType avec les arguments Args et retourne une référence dessus.
ARCANE_MPI_EXPORT bool arcaneIsAcceleratorAwareMPI()
Indique si le runtime actuel de MPI a le support des accélérateurs.
Definition ArcaneMpi.cc:82
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
Array< Byte > ByteArray
Tableau dynamique à une dimension de caractères.
Definition UtilsTypes.h:125
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:486
@ IK_Cell
Entité de maillage de genre maille.
void arcaneCallFunctionAndTerminateIfThrow(std::function< void()> function)
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int32_t Int32
Type entier signé sur 32 bits.
Infos pour construire un MpiParallelMng.
Infos pour construire un SequentialParallelMng.