14#include "arcane/utils/Collection.h"
15#include "arcane/utils/Enumerator.h"
16#include "arcane/utils/ScopedPtr.h"
17#include "arcane/utils/PlatformUtils.h"
18#include "arcane/utils/TimeoutException.h"
19#include "arcane/utils/NotImplementedException.h"
20#include "arcane/utils/ArgumentException.h"
21#include "arcane/utils/ITraceMng.h"
22#include "arcane/utils/ValueConvert.h"
23#include "arcane/utils/Exception.h"
24#include "arcane/utils/HPReal.h"
26#include "arcane/core/IIOMng.h"
27#include "arcane/core/Timer.h"
28#include "arcane/core/IItemFamily.h"
29#include "arcane/core/parallel/IStat.h"
30#include "arcane/core/internal/SerializeMessage.h"
32#include "arcane/parallel/mpi/MpiParallelMng.h"
33#include "arcane/parallel/mpi/MpiParallelDispatch.h"
34#include "arcane/parallel/mpi/MpiTimerMng.h"
35#include "arcane/parallel/mpi/MpiSerializeMessage.h"
36#include "arcane/parallel/mpi/MpiParallelNonBlockingCollective.h"
37#include "arcane/parallel/mpi/MpiDatatype.h"
38#include "arcane/parallel/mpi/IVariableSynchronizerMpiCommunicator.h"
40#include "arcane/impl/ParallelReplication.h"
41#include "arcane/impl/SequentialParallelMng.h"
42#include "arcane/impl/internal/ParallelMngUtilsFactoryBase.h"
43#include "arcane/impl/internal/VariableSynchronizer.h"
45#include "arccore/message_passing_mpi/MpiMessagePassingMng.h"
46#include "arccore/message_passing_mpi/internal/MpiSerializeDispatcher.h"
47#include "arccore/message_passing_mpi/internal/MpiRequestList.h"
48#include "arccore/message_passing_mpi/internal/MpiAdapter.h"
49#include "arccore/message_passing_mpi/internal/MpiLock.h"
50#include "arccore/message_passing/Dispatchers.h"
52#include "arccore/message_passing/SerializeMessageList.h"
62using namespace Arcane::MessagePassing::Mpi;
71#if defined(ARCANE_HAS_MPI_NEIGHBOR)
74arcaneCreateMpiNeighborVariableSynchronizerFactory(
MpiParallelMng* mpi_pm,
82arcaneCreateMpiDirectSendrecvVariableSynchronizerFactory(
MpiParallelMng* mpi_pm);
84arcaneCreateMpiLegacyVariableSynchronizerFactory(
MpiParallelMng* mpi_pm);
89MpiParallelMngBuildInfo::
90MpiParallelMngBuildInfo(MPI_Comm comm)
99, is_mpi_comm_owned(true)
102 ::MPI_Comm_rank(comm,&comm_rank);
103 ::MPI_Comm_size(comm,&comm_nb_rank);
106 MP::Mpi::MpiMessagePassingMng::BuildInfo bi(comm_rank,comm_nb_rank,m_dispatchers_ref.get(),mpi_comm);
119class VariableSynchronizerMpiCommunicator
124 : m_mpi_parallel_mng(pm){}
125 ~VariableSynchronizerMpiCommunicator()
override
127 _checkFreeCommunicator();
131 return m_topology_communicator;
136 const Int32 nb_message = comm_ranks.
size();
143 for(
Integer i=0; i<nb_message; ++i ){
144 destinations[i] = comm_ranks[i];
147 _checkFreeCommunicator();
149 int r = MPI_Dist_graph_create_adjacent(old_comm, nb_message, destinations.
data(), MPI_UNWEIGHTED,
150 nb_message, destinations.
data(), MPI_UNWEIGHTED,
151 MPI_INFO_NULL, 0, &m_topology_communicator);
162 MPI_Dist_graph_neighbors_count(m_topology_communicator,&indegree,&outdegree,&weighted);
164 if (indegree!=nb_message)
165 ARCANE_FATAL(
"Bad value '{0}' for 'indegree' (expected={1})",indegree,nb_message);
166 if (outdegree!=nb_message)
167 ARCANE_FATAL(
"Bad value '{0}' for 'outdegree' (expected={1})",outdegree,nb_message);
172 MPI_Dist_graph_neighbors(m_topology_communicator,indegree,srcs.
data(),MPI_UNWEIGHTED,outdegree,dsts.
data(),MPI_UNWEIGHTED);
174 for(
int k=0; k<outdegree; ++k){
176 if (x!=comm_ranks[k])
177 ARCANE_FATAL(
"Invalid destination rank order k={0} v={1} expected={2}",k,x,comm_ranks[k]);
180 for(
int k=0; k<indegree; ++k ){
182 if (x!=comm_ranks[k])
183 ARCANE_FATAL(
"Invalid source rank order k={0} v={1} expected={2}",k,x,comm_ranks[k]);
191 MPI_Comm m_topology_communicator = MPI_COMM_NULL;
195 void _checkFreeCommunicator()
197 if (m_topology_communicator!=MPI_COMM_NULL)
198 MPI_Comm_free(&m_topology_communicator);
199 m_topology_communicator = MPI_COMM_NULL;
211class MpiVariableSynchronizer
212:
public VariableSynchronizer
218 : VariableSynchronizer(pm,group,implementation_factory)
219 , m_topology_info(topology_info)
227 if (m_topology_info.get())
228 m_topology_info->compute(
this);
237class MpiParallelMngUtilsFactory
241 MpiParallelMngUtilsFactory()
242 : m_synchronizer_version(2)
245 m_synchronizer_version = 1;
247 m_synchronizer_version = 2;
249 m_synchronizer_version = 3;
251 m_synchronizer_version = 4;
254 Int32 block_size = 0;
255 if (!builtInGetValue(block_size,v))
256 m_synchronize_block_size = block_size;
257 m_synchronize_block_size = std::clamp(m_synchronize_block_size,0,1000000000);
261 Int32 nb_sequence = 0;
262 if (!builtInGetValue(nb_sequence,v))
263 m_synchronize_nb_sequence = nb_sequence;
264 m_synchronize_nb_sequence = std::clamp(m_synchronize_nb_sequence,1,1024*1024);
268 m_synchronizer_version = 5;
274 return _createSynchronizer(pm,family->
allItems());
279 return _createSynchronizer(pm,group);
293 if (m_synchronizer_version == 2){
295 tm->
info() <<
"Using MpiSynchronizer V2";
296 generic_factory = arcaneCreateMpiVariableSynchronizerFactory(mpi_pm);
298 else if (m_synchronizer_version == 3 ){
300 tm->
info() <<
"Using MpiSynchronizer V3";
301 generic_factory = arcaneCreateMpiDirectSendrecvVariableSynchronizerFactory(mpi_pm);
303 else if (m_synchronizer_version == 4){
305 tm->
info() <<
"Using MpiSynchronizer V4 block_size=" << m_synchronize_block_size
306 <<
" nb_sequence=" << m_synchronize_nb_sequence;
307 generic_factory = arcaneCreateMpiBlockVariableSynchronizerFactory(mpi_pm,m_synchronize_block_size,m_synchronize_nb_sequence);
309 else if (m_synchronizer_version == 5){
311 tm->
info() <<
"Using MpiSynchronizer V5";
312 topology_info = createRef<VariableSynchronizerMpiCommunicator>(mpi_pm);
313#if defined(ARCANE_HAS_MPI_NEIGHBOR)
314 generic_factory = arcaneCreateMpiNeighborVariableSynchronizerFactory(mpi_pm,topology_info);
316 throw NotSupportedException(A_FUNCINFO,
"Synchronize implementation V5 is not supported with this version of MPI");
321 tm->
info() <<
"Using MpiSynchronizer V1";
322 generic_factory = arcaneCreateMpiLegacyVariableSynchronizerFactory(mpi_pm);
324 if (!generic_factory.
get())
326 return createRef<MpiVariableSynchronizer>(pm,group,generic_factory,topology_info);
331 Integer m_synchronizer_version = 1;
332 Int32 m_synchronize_block_size = 32000;
333 Int32 m_synchronize_nb_sequence = 1;
345, m_trace(bi.trace_mng)
346, m_thread_mng(bi.thread_mng)
347, m_world_parallel_mng(bi.world_parallel_mng)
348, m_timer_mng(bi.timer_mng)
350, m_is_parallel(bi.is_parallel)
351, m_comm_rank(bi.commRank())
352, m_comm_size(bi.commSize())
354, m_communicator(bi.mpiComm())
355, m_is_communicator_owned(bi.is_mpi_comm_owned)
356, m_mpi_lock(bi.mpi_lock)
357, m_non_blocking_collective(nullptr)
360 if (!m_world_parallel_mng){
361 m_trace->debug()<<
"[MpiParallelMng] No m_world_parallel_mng found, reverting to ourselves!";
362 m_world_parallel_mng =
this;
372 delete m_non_blocking_collective;
373 m_sequential_parallel_mng.reset();
374 if (m_is_communicator_owned){
375 MpiLock::Section ls(m_mpi_lock);
376 MPI_Comm_free(&m_communicator);
378 delete m_replication;
380 if (m_is_timer_owned)
383 delete m_datatype_list;
398 DispatchCreator(ITraceMng* tm,IMessagePassingMng* mpm,MpiAdapter* adapter,MpiDatatypeList* datatype_list)
399 : m_tm(tm), m_mpm(mpm), m_adapter(adapter), m_datatype_list(datatype_list){}
401 template<
typename DataType> MpiParallelDispatchT<DataType>*
404 MpiDatatype* dt = m_datatype_list->datatype(DataType());
405 return new MpiParallelDispatchT<DataType>(m_tm,m_mpm,m_adapter,dt);
409 IMessagePassingMng* m_mpm;
410 MpiAdapter* m_adapter;
411 MpiDatatypeList* m_datatype_list;
417class ControlDispatcherDecorator
422 ControlDispatcherDecorator(IParallelMng* pm, MpiAdapter* adapter)
423 : ParallelMngDispatcher::DefaultControlDispatcher(pm), m_adapter(adapter) {}
425 IMessagePassingMng* commSplit(
bool keep)
override
427 return m_adapter->commSplit(keep);
429 MP::IProfiler* profiler()
const override {
return m_adapter->profiler(); }
430 void setProfiler(MP::IProfiler* p)
override { m_adapter->setProfiler(p); }
433 MpiAdapter* m_adapter;
446 m_is_timer_owned =
true;
455 m_sequential_parallel_mng = arcaneCreateSequentialParallelMngRef(bi);
460 bool is_ordered_reduce =
false;
462 is_ordered_reduce =
true;
467 MpiAdapter* adapter =
new MpiAdapter(m_trace,m_stat->toArccoreStat(),m_communicator,m_mpi_lock);
469 auto mpm = _messagePassingMng();
472 auto* control_dispatcher =
new ControlDispatcherDecorator(
this,m_adapter);
473 _setControlDispatcher(control_dispatcher);
477 m_mpi_serialize_dispatcher = serialize_dispatcher;
478 _setSerializeDispatcher(serialize_dispatcher);
480 DispatchCreator creator(m_trace,mpm,m_adapter,m_datatype_list);
481 this->createDispatchers(creator);
483 m_io_mng = arcaneCreateIOMng(
this);
486 m_non_blocking_collective->build();
488 m_trace->info() <<
"Using mpi with locks.";
502 m_trace->warning() <<
"MpiParallelMng already initialized";
506 m_trace->info() <<
"Initialisation de MpiParallelMng";
507 m_sequential_parallel_mng->initialize();
523 m_mpi_serialize_dispatcher->legacySendSerializer(s,{
MessageRank(rank),mpi_tag,Blocking});
532 return m_utils_factory->createSendSerializeMessage(
this, rank)._release();
544 return m_mpi_serialize_dispatcher->legacySendSerializer(s,{
MessageRank(rank),mpi_tag,NonBlocking});
554 m_mpi_serialize_dispatcher->broadcastSerializer(values,
MessageRank(rank));
566 m_mpi_serialize_dispatcher->legacyReceiveSerializer(values,
MessageRank(rank),mpi_tag);
575 return m_utils_factory->createReceiveSerializeMessage(
this, rank)._release();
584 return m_adapter->probeMessage(message);
593 return m_adapter->legacyProbeMessage(message);
621 m_adapter->freeRequest(requests[i]);
628_checkFinishedSubRequests()
630 m_mpi_serialize_dispatcher->checkFinishedSubRequests();
637sequentialParallelMngRef()
639 return m_sequential_parallel_mng;
645 return m_sequential_parallel_mng.get();
655 m_stat->print(m_trace);
665 m_adapter->barrier();
674 m_adapter->waitAllRequests(requests);
675 _checkFinishedSubRequests();
684 return _waitSomeRequests(requests,
false);
693 return _waitSomeRequests(requests,
true);
705 m_adapter->waitSomeRequests(requests, done_indexes, is_non_blocking);
706 for (
int i = 0 ; i < requests.
size() ; i++) {
717_createSerializeMessageList()
728 return m_utils_factory->createGetVariablesValuesOperation(
this)._release();
737 return m_utils_factory->createTransferValuesOperation(
this)._release();
746 return m_utils_factory->createExchanger(
this)._release();
755 return m_utils_factory->createSynchronizer(
this,family)._release();
764 return m_utils_factory->createSynchronizer(
this,group)._release();
773 return m_utils_factory->createTopology(
this)._release();
782 return m_replication;
791 delete m_replication;
799_createSubParallelMng(MPI_Comm sub_communicator)
802 if (sub_communicator==MPI_COMM_NULL)
806 MPI_Comm_rank(sub_communicator,&sub_rank);
811 bi.timer_mng = m_timer_mng;
812 bi.thread_mng = m_thread_mng;
813 bi.trace_mng = m_trace;
814 bi.world_parallel_mng = m_world_parallel_mng;
815 bi.mpi_lock = m_mpi_lock;
826_createSubParallelMngRef(Int32 color, Int32 key)
829 color = MPI_UNDEFINED;
830 MPI_Comm sub_communicator = MPI_COMM_NULL;
831 MPI_Comm_split(m_communicator, color, key, &sub_communicator);
832 IParallelMng* sub_pm = _createSubParallelMng(sub_communicator);
842 MPI_Group mpi_group = MPI_GROUP_NULL;
843 MPI_Comm_group(m_communicator, &mpi_group);
845 UniqueArray<int> mpi_kept_ranks(nb_sub_rank);
846 for (
Integer i = 0; i < nb_sub_rank; ++i)
847 mpi_kept_ranks[i] = (
int)kept_ranks[i];
849 MPI_Group final_group = MPI_GROUP_NULL;
850 MPI_Group_incl(mpi_group, nb_sub_rank, mpi_kept_ranks.data(), &final_group);
851 MPI_Comm sub_communicator = MPI_COMM_NULL;
853 MPI_Comm_create(m_communicator, final_group, &sub_communicator);
854 MPI_Group_free(&final_group);
855 return _createSubParallelMng(sub_communicator);
868:
public MpiRequestList
870 using Base = MpiRequestList;
872 explicit RequestList(MpiParallelMng* pm)
873 : Base(pm->m_adapter), m_parallel_mng(pm){}
878 m_parallel_mng->_checkFinishedSubRequests();
881 MpiParallelMng* m_parallel_mng;
899 return m_utils_factory;
906_isAcceleratorAware()
const
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Liste des fonctions d'échange de message.
Vue modifiable d'un tableau d'un type T.
constexpr Integer size() const noexcept
Retourne la taille du tableau.
const T * data() const
Accès à la racine du tableau hors toute protection.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
Opérations pour accéder aux valeurs de variables d'un autre sous-domaine.
Interface du gestionnaire des entrées sorties.
Interface d'une famille d'entités.
virtual ItemGroup allItems() const =0
Groupe de toutes les entités.
Echange d'informations entre processeurs.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual ITraceMng * traceMng() const =0
Gestionnaire de traces.
virtual void build()=0
Construit l'instance.
Informations sur la réplication des sous-domaines en parallèle.
Informations sur la topologie d'allocation des coeurs de calcul.
Interface d'un sérialiseur.
Interface du gestionnaire de traces.
virtual void flush()=0
Flush tous les flots.
virtual TraceMessage info()=0
Flot pour un message d'information.
Envoie de valeurs sur différents processeurs.
Interface d'un communicateur MPI spécifique pour les synchronisations.
Interface d'un service de synchronisation de variable.
Groupe d'entités de maillage.
eItemKind itemKind() const
Genre du groupe. Il s'agit du genre de ses éléments.
bool isAllItems() const
Indique si le groupe est celui de toutes les entités.
Interface d'une liste de messages de sérialisation.
Interface d'un message de sérialisation entre IMessagePassingMng.
Informations sur la source d'un message.
void _wait(eWaitType wait_type) override
Effectue l'attente ou le test.
Request receiveSerializer(ISerializer *s, const PointToPointMessageInfo &message) override
Message de réception.
Request sendSerializer(const ISerializer *s, const PointToPointMessageInfo &message) override
Message d'envoi.
Informations pour envoyer/recevoir un message point à point.
Message de sérialisation utilisant un BasicSerializer.
static MessageTag defaultTag()
Tag par défaut pour les messages de sérialisation.
Gère les MPI_Datatype associées aux types Arcane.
Ref< IVariableSynchronizer > createSynchronizer(IParallelMng *pm, IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
Ref< IVariableSynchronizer > createSynchronizer(IParallelMng *pm, const ItemGroup &group) override
Retourne une interface pour synchroniser des variables sur le groupe group.
Spécialisation de MpiRequestList pour MpiParallelMng.
void _wait(Parallel::eWaitType wait_type) override
Effectue l'attente ou le test.
Gestionnaire du parallélisme utilisant MPI.
IParallelMng * worldParallelMng() const override
Gestionnaire de parallélisme sur l'ensemble des ressources allouées.
MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
void barrier() override
Effectue une barière.
UniqueArray< Integer > waitSomeRequests(ArrayView< Request > requests) override
Bloque en attendant qu'une des requêtes rvalues soit terminée.
void waitAllRequests(ArrayView< Request > requests) override
Bloque en attendant que les requêtes rvalues soient terminées.
MessageId probe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
void build() override
Construit l'instance.
void printStats() override
Affiche des statistiques liées à ce gestionnaire du parallélisme.
bool m_is_initialized
true si déjà initialisé
IParallelMng * sequentialParallelMng() override
Retourne un gestionnaire de parallélisme séquentiel.
IThreadMng * threadMng() const override
Gestionnaire de threads.
ITimerMng * timerMng() const override
Gestionnaire de timers.
void initialize() override
Initialise le gestionnaire du parallélisme.
IVariableSynchronizer * createSynchronizer(IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
ISerializeMessage * createSendSerializer(Int32 rank) override
Créé un message non bloquant pour envoyer des données sérialisées au rang rank.
bool isParallel() const override
Retourne true si l'exécution est parallèle.
Ref< IParallelMngUtilsFactory > _internalUtilsFactory() const override
Fabrique des fonctions utilitaires.
ITraceMng * traceMng() const override
Gestionnaire de traces.
IParallelTopology * createTopology() override
Créé une instance contenant les infos sur la topologie des rangs de ce gestionnnaire.
Communicator communicator() const override
Communicateur MPI associé à ce gestionnaire.
IParallelExchanger * createExchanger() override
Retourne une interface pour transférer des messages entre processeurs.
IParallelReplication * replication() const override
Informations sur la réplication.
void setReplication(IParallelReplication *v) override
Positionne les Informations sur la réplication.
Ref< Parallel::IRequestList > createRequestListRef() override
Créé une liste de requêtes pour ce gestionnaire.
ITransferValuesParallelOperation * createTransferValuesOperation() override
Retourne une opération pour transférer des valeurs entre sous-domaine.
UniqueArray< Integer > testSomeRequests(ArrayView< Request > requests) override
Test si une des requêtes rvalues est terminée.
IGetVariablesValuesParallelOperation * createGetVariablesValuesOperation() override
Retourne une opération pour récupérer les valeurs d'une variable sur les entités d'un autre sous-doma...
ISerializeMessage * createReceiveSerializer(Int32 rank) override
Créé un message non bloquant pour recevoir des données sérialisées du rang rank.
void freeRequests(ArrayView< Parallel::Request > requests) override
Libère les requêtes.
Gestionnaire du parallélisme utilisant MPI.
Gestionnaire de timer utisant la bibliothèque MPI.
void compute() override
Recalcule les infos de synchronisation.
Implémentation de Arccore::MessagePassing::IControlDispatcher.
Redirige la gestion des messages des sous-domaines suivant le type de l'argument.
IMessagePassingMng * messagePassingMng() const override
Gestionnaire de message de Arccore associé
ITimeMetricCollector * timeMetricCollector() const override
Collecteur Arccore des statistiques temporelles (peut être nul)
ITimeStats * timeStats() const override
Gestionnaire de statistiques associé (peut être nul)
Classe de base d'une fabrique pour les fonctions utilitaires de IParallelMng.
Informations sur la réplication des sous-domaines en parallèle.
Référence à une instance.
InstanceType * get() const
Instance associée ou nullptr si aucune.
Chaîne de caractères unicode.
bool null() const
Retourne true si la chaîne est nulle.
Positionne la phase de l'action en cours d'exécution.
Positionne une classe de message.
Vecteur 1D de données avec sémantique par valeur (style STL).
MPI_Comm communicator() const override
Récupère le communicateur spécifique de la topologie.
void compute(VariableSynchronizer *var_syncer) override
Calcul le communicateur spécifique.
Interface d'un service de synchronisation de variable.
void compute() override
Création de la liste des éléments de synchronisation.
Int32ConstArrayView communicatingRanks() override
Rangs des sous-domaines avec lesquels on communique.
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
Ref< TrueType > createRef(Args &&... args)
Créé une instance de type TrueType avec les arguments Args et retourne une référence dessus.
ARCANE_MPI_EXPORT bool arcaneIsAcceleratorAwareMPI()
Indique si le runtime actuel de MPI a le support des accélérateurs.
Int32 Integer
Type représentant un entier.
Array< Byte > ByteArray
Tableau dynamique à une dimension de caractères.
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
@ IK_Cell
Entité de maillage de genre maille.
void arcaneCallFunctionAndTerminateIfThrow(std::function< void()> function)
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int32_t Int32
Type entier signé sur 32 bits.
Infos pour construire un MpiParallelMng.
Infos pour construire un SequentialParallelMng.