14#include "arcane/utils/Collection.h"
15#include "arcane/utils/Enumerator.h"
16#include "arcane/utils/ScopedPtr.h"
17#include "arcane/utils/PlatformUtils.h"
18#include "arcane/utils/TimeoutException.h"
19#include "arcane/utils/NotImplementedException.h"
20#include "arcane/utils/ArgumentException.h"
21#include "arcane/utils/ITraceMng.h"
22#include "arcane/utils/ValueConvert.h"
23#include "arcane/utils/Exception.h"
24#include "arcane/utils/HPReal.h"
26#include "arcane/core/IIOMng.h"
27#include "arcane/core/Timer.h"
28#include "arcane/core/IItemFamily.h"
29#include "arcane/core/parallel/IStat.h"
30#include "arcane/core/internal/SerializeMessage.h"
31#include "arcane/core/internal/ParallelMngInternal.h"
33#include "arcane/parallel/mpi/MpiParallelMng.h"
34#include "arcane/parallel/mpi/MpiParallelDispatch.h"
35#include "arcane/parallel/mpi/MpiTimerMng.h"
36#include "arcane/parallel/mpi/MpiSerializeMessage.h"
37#include "arcane/parallel/mpi/MpiParallelNonBlockingCollective.h"
38#include "arcane/parallel/mpi/MpiDatatype.h"
39#include "arcane/parallel/mpi/IVariableSynchronizerMpiCommunicator.h"
41#include "arcane/impl/ParallelReplication.h"
42#include "arcane/impl/SequentialParallelMng.h"
43#include "arcane/impl/internal/ParallelMngUtilsFactoryBase.h"
44#include "arcane/impl/internal/VariableSynchronizer.h"
46#include "arccore/message_passing_mpi/MpiMessagePassingMng.h"
47#include "arccore/message_passing_mpi/internal/MpiSerializeDispatcher.h"
48#include "arccore/message_passing_mpi/internal/MpiRequestList.h"
49#include "arccore/message_passing_mpi/internal/MpiAdapter.h"
50#include "arccore/message_passing_mpi/internal/MpiLock.h"
51#include "arccore/message_passing_mpi/internal/MpiMachineMemoryWindowBaseInternalCreator.h"
52#include "arccore/message_passing_mpi/internal/MpiMachineMemoryWindowBaseInternal.h"
53#include "arccore/message_passing/Dispatchers.h"
55#include "arccore/message_passing/SerializeMessageList.h"
65using namespace Arcane::MessagePassing::Mpi;
74#if defined(ARCANE_HAS_MPI_NEIGHBOR)
77arcaneCreateMpiNeighborVariableSynchronizerFactory(
MpiParallelMng* mpi_pm,
85arcaneCreateMpiDirectSendrecvVariableSynchronizerFactory(
MpiParallelMng* mpi_pm);
87arcaneCreateMpiLegacyVariableSynchronizerFactory(
MpiParallelMng* mpi_pm);
92MpiParallelMngBuildInfo::
93MpiParallelMngBuildInfo(MPI_Comm comm)
102, is_mpi_comm_owned(true)
105 ::MPI_Comm_rank(comm,&comm_rank);
106 ::MPI_Comm_size(comm,&comm_nb_rank);
109 MP::Mpi::MpiMessagePassingMng::BuildInfo bi(comm_rank,comm_nb_rank,m_dispatchers_ref.get(),mpi_comm);
122class VariableSynchronizerMpiCommunicator
127 : m_mpi_parallel_mng(pm){}
128 ~VariableSynchronizerMpiCommunicator()
override
130 _checkFreeCommunicator();
134 return m_topology_communicator;
139 const Int32 nb_message = comm_ranks.
size();
146 for(
Integer i=0; i<nb_message; ++i ){
147 destinations[i] = comm_ranks[i];
150 _checkFreeCommunicator();
152 int r = MPI_Dist_graph_create_adjacent(old_comm, nb_message, destinations.
data(), MPI_UNWEIGHTED,
153 nb_message, destinations.
data(), MPI_UNWEIGHTED,
154 MPI_INFO_NULL, 0, &m_topology_communicator);
165 MPI_Dist_graph_neighbors_count(m_topology_communicator,&indegree,&outdegree,&weighted);
167 if (indegree!=nb_message)
168 ARCANE_FATAL(
"Bad value '{0}' for 'indegree' (expected={1})",indegree,nb_message);
169 if (outdegree!=nb_message)
170 ARCANE_FATAL(
"Bad value '{0}' for 'outdegree' (expected={1})",outdegree,nb_message);
175 MPI_Dist_graph_neighbors(m_topology_communicator,indegree,srcs.
data(),MPI_UNWEIGHTED,outdegree,dsts.
data(),MPI_UNWEIGHTED);
177 for(
int k=0; k<outdegree; ++k){
179 if (x!=comm_ranks[k])
180 ARCANE_FATAL(
"Invalid destination rank order k={0} v={1} expected={2}",k,x,comm_ranks[k]);
183 for(
int k=0; k<indegree; ++k ){
185 if (x!=comm_ranks[k])
186 ARCANE_FATAL(
"Invalid source rank order k={0} v={1} expected={2}",k,x,comm_ranks[k]);
194 MPI_Comm m_topology_communicator = MPI_COMM_NULL;
198 void _checkFreeCommunicator()
200 if (m_topology_communicator!=MPI_COMM_NULL)
201 MPI_Comm_free(&m_topology_communicator);
202 m_topology_communicator = MPI_COMM_NULL;
214class MpiVariableSynchronizer
215:
public VariableSynchronizer
221 : VariableSynchronizer(pm,group,implementation_factory)
222 , m_topology_info(topology_info)
230 if (m_topology_info.get())
231 m_topology_info->compute(
this);
240class MpiParallelMngUtilsFactory
244 MpiParallelMngUtilsFactory()
245 : m_synchronizer_version(2)
248 m_synchronizer_version = 1;
250 m_synchronizer_version = 2;
252 m_synchronizer_version = 3;
254 m_synchronizer_version = 4;
257 Int32 block_size = 0;
258 if (!builtInGetValue(block_size,v))
259 m_synchronize_block_size = block_size;
260 m_synchronize_block_size = std::clamp(m_synchronize_block_size,0,1000000000);
264 Int32 nb_sequence = 0;
265 if (!builtInGetValue(nb_sequence,v))
266 m_synchronize_nb_sequence = nb_sequence;
267 m_synchronize_nb_sequence = std::clamp(m_synchronize_nb_sequence,1,1024*1024);
271 m_synchronizer_version = 5;
277 return _createSynchronizer(pm,family->
allItems());
282 return _createSynchronizer(pm,group);
296 if (m_synchronizer_version == 2){
298 tm->
info() <<
"Using MpiSynchronizer V2";
299 generic_factory = arcaneCreateMpiVariableSynchronizerFactory(mpi_pm);
301 else if (m_synchronizer_version == 3 ){
303 tm->
info() <<
"Using MpiSynchronizer V3";
304 generic_factory = arcaneCreateMpiDirectSendrecvVariableSynchronizerFactory(mpi_pm);
306 else if (m_synchronizer_version == 4){
308 tm->
info() <<
"Using MpiSynchronizer V4 block_size=" << m_synchronize_block_size
309 <<
" nb_sequence=" << m_synchronize_nb_sequence;
310 generic_factory = arcaneCreateMpiBlockVariableSynchronizerFactory(mpi_pm,m_synchronize_block_size,m_synchronize_nb_sequence);
312 else if (m_synchronizer_version == 5){
314 tm->
info() <<
"Using MpiSynchronizer V5";
315 topology_info = createRef<VariableSynchronizerMpiCommunicator>(mpi_pm);
316#if defined(ARCANE_HAS_MPI_NEIGHBOR)
317 generic_factory = arcaneCreateMpiNeighborVariableSynchronizerFactory(mpi_pm,topology_info);
319 throw NotSupportedException(A_FUNCINFO,
"Synchronize implementation V5 is not supported with this version of MPI");
324 tm->
info() <<
"Using MpiSynchronizer V1";
325 generic_factory = arcaneCreateMpiLegacyVariableSynchronizerFactory(mpi_pm);
327 if (!generic_factory.
get())
329 return createRef<MpiVariableSynchronizer>(pm,group,generic_factory,topology_info);
334 Integer m_synchronizer_version = 1;
335 Int32 m_synchronize_block_size = 32000;
336 Int32 m_synchronize_nb_sequence = 1;
346:
public ParallelMngInternal
350 explicit Impl(MpiParallelMng* pm)
351 : ParallelMngInternal(pm)
355 ~Impl()
override =
default;
361 return makeRef(m_parallel_mng->adapter()->windowCreator()->createWindow(sizeof_segment, sizeof_type));
366 MpiParallelMng* m_parallel_mng;
378, m_trace(bi.trace_mng)
379, m_thread_mng(bi.thread_mng)
380, m_world_parallel_mng(bi.world_parallel_mng)
381, m_timer_mng(bi.timer_mng)
383, m_is_parallel(bi.is_parallel)
384, m_comm_rank(bi.commRank())
385, m_comm_size(bi.commSize())
387, m_communicator(bi.mpiComm())
388, m_is_communicator_owned(bi.is_mpi_comm_owned)
389, m_mpi_lock(bi.mpi_lock)
390, m_non_blocking_collective(nullptr)
392, m_parallel_mng_internal(new Impl(this))
394 if (!m_world_parallel_mng){
395 m_trace->debug()<<
"[MpiParallelMng] No m_world_parallel_mng found, reverting to ourselves!";
396 m_world_parallel_mng =
this;
406 delete m_parallel_mng_internal;
407 delete m_non_blocking_collective;
408 m_sequential_parallel_mng.reset();
409 if (m_is_communicator_owned){
410 MpiLock::Section ls(m_mpi_lock);
411 MPI_Comm_free(&m_communicator);
413 delete m_replication;
415 if (m_is_timer_owned)
418 delete m_datatype_list;
433 DispatchCreator(ITraceMng* tm,IMessagePassingMng* mpm,MpiAdapter* adapter,MpiDatatypeList* datatype_list)
434 : m_tm(tm), m_mpm(mpm), m_adapter(adapter), m_datatype_list(datatype_list){}
436 template<
typename DataType> MpiParallelDispatchT<DataType>*
439 MpiDatatype* dt = m_datatype_list->datatype(DataType());
440 return new MpiParallelDispatchT<DataType>(m_tm,m_mpm,m_adapter,dt);
444 IMessagePassingMng* m_mpm;
445 MpiAdapter* m_adapter;
446 MpiDatatypeList* m_datatype_list;
452class ControlDispatcherDecorator
453:
public ParallelMngDispatcher::DefaultControlDispatcher
457 ControlDispatcherDecorator(IParallelMng* pm, MpiAdapter* adapter)
458 : ParallelMngDispatcher::DefaultControlDispatcher(pm), m_adapter(adapter) {}
460 IMessagePassingMng* commSplit(
bool keep)
override
462 return m_adapter->commSplit(keep);
464 MP::IProfiler* profiler()
const override {
return m_adapter->profiler(); }
465 void setProfiler(MP::IProfiler* p)
override { m_adapter->setProfiler(p); }
468 MpiAdapter* m_adapter;
481 m_is_timer_owned =
true;
490 m_sequential_parallel_mng = arcaneCreateSequentialParallelMngRef(bi);
495 bool is_ordered_reduce =
false;
497 is_ordered_reduce =
true;
502 MpiAdapter* adapter =
new MpiAdapter(m_trace,m_stat->toArccoreStat(),m_communicator,m_mpi_lock);
504 auto mpm = _messagePassingMng();
507 auto* control_dispatcher =
new ControlDispatcherDecorator(
this,m_adapter);
508 _setControlDispatcher(control_dispatcher);
512 m_mpi_serialize_dispatcher = serialize_dispatcher;
513 _setSerializeDispatcher(serialize_dispatcher);
515 DispatchCreator creator(m_trace,mpm,m_adapter,m_datatype_list);
516 this->createDispatchers(creator);
518 m_io_mng = arcaneCreateIOMng(
this);
521 m_non_blocking_collective->build();
523 m_trace->info() <<
"Using mpi with locks.";
537 m_trace->warning() <<
"MpiParallelMng already initialized";
541 m_trace->info() <<
"Initialisation de MpiParallelMng";
542 m_sequential_parallel_mng->initialize();
558 m_mpi_serialize_dispatcher->legacySendSerializer(s,{
MessageRank(rank),mpi_tag,Blocking});
567 return m_utils_factory->createSendSerializeMessage(
this, rank)._release();
579 return m_mpi_serialize_dispatcher->legacySendSerializer(s,{
MessageRank(rank),mpi_tag,NonBlocking});
589 m_mpi_serialize_dispatcher->broadcastSerializer(values,
MessageRank(rank));
601 m_mpi_serialize_dispatcher->legacyReceiveSerializer(values,
MessageRank(rank),mpi_tag);
610 return m_utils_factory->createReceiveSerializeMessage(
this, rank)._release();
619 return m_adapter->probeMessage(message);
628 return m_adapter->legacyProbeMessage(message);
656 m_adapter->freeRequest(requests[i]);
663_checkFinishedSubRequests()
665 m_mpi_serialize_dispatcher->checkFinishedSubRequests();
672sequentialParallelMngRef()
674 return m_sequential_parallel_mng;
680 return m_sequential_parallel_mng.get();
690 m_stat->print(m_trace);
700 m_adapter->barrier();
709 m_adapter->waitAllRequests(requests);
710 _checkFinishedSubRequests();
719 return _waitSomeRequests(requests,
false);
728 return _waitSomeRequests(requests,
true);
740 m_adapter->waitSomeRequests(requests, done_indexes, is_non_blocking);
741 for (
int i = 0 ; i < requests.
size() ; i++) {
751ISerializeMessageList* MpiParallelMng::
752_createSerializeMessageList()
763 return m_utils_factory->createGetVariablesValuesOperation(
this)._release();
772 return m_utils_factory->createTransferValuesOperation(
this)._release();
781 return m_utils_factory->createExchanger(
this)._release();
790 return m_utils_factory->createSynchronizer(
this,family)._release();
799 return m_utils_factory->createSynchronizer(
this,group)._release();
808 return m_utils_factory->createTopology(
this)._release();
817 return m_replication;
826 delete m_replication;
834_createSubParallelMng(MPI_Comm sub_communicator)
837 if (sub_communicator==MPI_COMM_NULL)
841 MPI_Comm_rank(sub_communicator,&sub_rank);
846 bi.timer_mng = m_timer_mng;
847 bi.thread_mng = m_thread_mng;
848 bi.trace_mng = m_trace;
849 bi.world_parallel_mng = m_world_parallel_mng;
850 bi.mpi_lock = m_mpi_lock;
861_createSubParallelMngRef(Int32 color, Int32 key)
864 color = MPI_UNDEFINED;
865 MPI_Comm sub_communicator = MPI_COMM_NULL;
866 MPI_Comm_split(m_communicator, color, key, &sub_communicator);
867 IParallelMng* sub_pm = _createSubParallelMng(sub_communicator);
874IParallelMng* MpiParallelMng::
877 MPI_Group mpi_group = MPI_GROUP_NULL;
878 MPI_Comm_group(m_communicator, &mpi_group);
880 UniqueArray<int> mpi_kept_ranks(nb_sub_rank);
881 for (
Integer i = 0; i < nb_sub_rank; ++i)
882 mpi_kept_ranks[i] = (
int)kept_ranks[i];
884 MPI_Group final_group = MPI_GROUP_NULL;
885 MPI_Group_incl(mpi_group, nb_sub_rank, mpi_kept_ranks.data(), &final_group);
886 MPI_Comm sub_communicator = MPI_COMM_NULL;
888 MPI_Comm_create(m_communicator, final_group, &sub_communicator);
889 MPI_Group_free(&final_group);
890 return _createSubParallelMng(sub_communicator);
903:
public MpiRequestList
905 using Base = MpiRequestList;
907 explicit RequestList(MpiParallelMng* pm)
908 : Base(pm->m_adapter), m_parallel_mng(pm){}
913 m_parallel_mng->_checkFinishedSubRequests();
916 MpiParallelMng* m_parallel_mng;
934 return m_utils_factory;
941_isAcceleratorAware()
const
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Liste des fonctions d'échange de message.
Vue modifiable d'un tableau d'un type T.
constexpr Integer size() const noexcept
Retourne la taille du tableau.
const T * data() const
Accès à la racine du tableau hors toute protection.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
Opérations pour accéder aux valeurs de variables d'un autre sous-domaine.
Interface du gestionnaire des entrées sorties.
Interface d'une famille d'entités.
virtual ItemGroup allItems() const =0
Groupe de toutes les entités.
Échange d'informations entre processeurs.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual ITraceMng * traceMng() const =0
Gestionnaire de traces.
virtual void build()=0
Construit l'instance.
Informations sur la réplication des sous-domaines en parallèle.
Informations sur la topologie d'allocation des coeurs de calcul.
Interface d'un sérialiseur.
Interface du gestionnaire de traces.
virtual void flush()=0
Flush tous les flots.
virtual TraceMessage info()=0
Flot pour un message d'information.
Envoie de valeurs sur différents processeurs.
Interface d'un communicateur MPI spécifique pour les synchronisations.
Interface d'un service de synchronisation de variable.
Groupe d'entités de maillage.
eItemKind itemKind() const
Genre du groupe. Il s'agit du genre de ses éléments.
bool isAllItems() const
Indique si le groupe est celui de toutes les entités.
Interface d'un message de sérialisation entre IMessagePassingMng.
Informations sur la source d'un message.
void _wait(eWaitType wait_type) override
Effectue l'attente ou le test.
Request receiveSerializer(ISerializer *s, const PointToPointMessageInfo &message) override
Message de réception.
Request sendSerializer(const ISerializer *s, const PointToPointMessageInfo &message) override
Message d'envoi.
Informations pour envoyer/recevoir un message point à point.
Message de sérialisation utilisant un BasicSerializer.
static MessageTag defaultTag()
Tag par défaut pour les messages de sérialisation.
Gère les MPI_Datatype associées aux types Arcane.
Ref< IVariableSynchronizer > createSynchronizer(IParallelMng *pm, IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
Ref< IVariableSynchronizer > createSynchronizer(IParallelMng *pm, const ItemGroup &group) override
Retourne une interface pour synchroniser des variables sur le groupe group.
Ref< IMachineMemoryWindowBaseInternal > createMachineMemoryWindowBase(Int64 sizeof_segment, Int32 sizeof_type) override
Méthode permettant de créer une fenêtre mémoire sur le noeud.
Spécialisation de MpiRequestList pour MpiParallelMng.
void _wait(Parallel::eWaitType wait_type) override
Effectue l'attente ou le test.
Gestionnaire du parallélisme utilisant MPI.
IParallelMng * worldParallelMng() const override
Gestionnaire de parallélisme sur l'ensemble des ressources allouées.
MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
void barrier() override
Effectue une barière.
UniqueArray< Integer > waitSomeRequests(ArrayView< Request > requests) override
Bloque en attendant qu'une des requêtes rvalues soit terminée.
void waitAllRequests(ArrayView< Request > requests) override
Bloque en attendant que les requêtes rvalues soient terminées.
MessageId probe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
void build() override
Construit l'instance.
void printStats() override
Affiche des statistiques liées à ce gestionnaire du parallélisme.
bool m_is_initialized
true si déjà initialisé
IParallelMng * sequentialParallelMng() override
Retourne un gestionnaire de parallélisme séquentiel.
IThreadMng * threadMng() const override
Gestionnaire de threads.
ITimerMng * timerMng() const override
Gestionnaire de timers.
void initialize() override
Initialise le gestionnaire du parallélisme.
IVariableSynchronizer * createSynchronizer(IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
ISerializeMessage * createSendSerializer(Int32 rank) override
Créé un message non bloquant pour envoyer des données sérialisées au rang rank.
bool isParallel() const override
Retourne true si l'exécution est parallèle.
Ref< IParallelMngUtilsFactory > _internalUtilsFactory() const override
Fabrique des fonctions utilitaires.
ITraceMng * traceMng() const override
Gestionnaire de traces.
IParallelTopology * createTopology() override
Créé une instance contenant les infos sur la topologie des rangs de ce gestionnnaire.
Communicator communicator() const override
Communicateur MPI associé à ce gestionnaire.
IParallelExchanger * createExchanger() override
Retourne une interface pour transférer des messages entre processeurs.
IParallelReplication * replication() const override
Informations sur la réplication.
void setReplication(IParallelReplication *v) override
Positionne les Informations sur la réplication.
Ref< Parallel::IRequestList > createRequestListRef() override
Créé une liste de requêtes pour ce gestionnaire.
ITransferValuesParallelOperation * createTransferValuesOperation() override
Retourne une opération pour transférer des valeurs entre sous-domaine.
UniqueArray< Integer > testSomeRequests(ArrayView< Request > requests) override
Test si une des requêtes rvalues est terminée.
IGetVariablesValuesParallelOperation * createGetVariablesValuesOperation() override
Retourne une opération pour récupérer les valeurs d'une variable sur les entités d'un autre sous-doma...
ISerializeMessage * createReceiveSerializer(Int32 rank) override
Créé un message non bloquant pour recevoir des données sérialisées du rang rank.
void freeRequests(ArrayView< Parallel::Request > requests) override
Libère les requêtes.
Gestionnaire du parallélisme utilisant MPI.
Gestionnaire de timer utisant la bibliothèque MPI.
void compute() override
Recalcule les infos de synchronisation.
Redirige la gestion des messages des sous-domaines suivant le type de l'argument.
IMessagePassingMng * messagePassingMng() const override
Gestionnaire de message de Arccore associé
ITimeMetricCollector * timeMetricCollector() const override
Collecteur Arccore des statistiques temporelles (peut être nul)
ITimeStats * timeStats() const override
Gestionnaire de statistiques associé (peut être nul)
Classe de base d'une fabrique pour les fonctions utilitaires de IParallelMng.
Informations sur la réplication des sous-domaines en parallèle.
Référence à une instance.
InstanceType * get() const
Instance associée ou nullptr si aucune.
Chaîne de caractères unicode.
bool null() const
Retourne true si la chaîne est nulle.
Positionne la phase de l'action en cours d'exécution.
Positionne une classe de message.
Vecteur 1D de données avec sémantique par valeur (style STL).
MPI_Comm communicator() const override
Récupère le communicateur spécifique de la topologie.
void compute(VariableSynchronizer *var_syncer) override
Calcul le communicateur spécifique.
Interface d'un service de synchronisation de variable.
void compute() override
Création de la liste des éléments de synchronisation.
Int32ConstArrayView communicatingRanks() override
Rangs des sous-domaines avec lesquels on communique.
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
Ref< TrueType > createRef(Args &&... args)
Créé une instance de type TrueType avec les arguments Args et retourne une référence dessus.
ARCANE_MPI_EXPORT bool arcaneIsAcceleratorAwareMPI()
Indique si le runtime actuel de MPI a le support des accélérateurs.
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
Array< Byte > ByteArray
Tableau dynamique à une dimension de caractères.
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
@ IK_Cell
Entité de maillage de genre maille.
void arcaneCallFunctionAndTerminateIfThrow(std::function< void()> function)
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int32_t Int32
Type entier signé sur 32 bits.
Infos pour construire un MpiParallelMng.
Infos pour construire un SequentialParallelMng.