14#include "arcane/utils/Collection.h"
15#include "arcane/utils/Enumerator.h"
16#include "arcane/utils/ScopedPtr.h"
17#include "arcane/utils/PlatformUtils.h"
18#include "arcane/utils/TimeoutException.h"
19#include "arcane/utils/NotImplementedException.h"
20#include "arcane/utils/ArgumentException.h"
21#include "arcane/utils/ITraceMng.h"
22#include "arcane/utils/ValueConvert.h"
23#include "arcane/utils/Exception.h"
24#include "arcane/utils/HPReal.h"
26#include "arcane/core/IIOMng.h"
27#include "arcane/core/Timer.h"
28#include "arcane/core/IItemFamily.h"
29#include "arcane/core/SerializeMessage.h"
30#include "arcane/core/parallel/IStat.h"
32#include "arcane/parallel/mpi/MpiParallelMng.h"
33#include "arcane/parallel/mpi/MpiAdapter.h"
34#include "arcane/parallel/mpi/MpiParallelDispatch.h"
35#include "arcane/parallel/mpi/MpiSerializeMessageList.h"
36#include "arcane/parallel/mpi/MpiTimerMng.h"
37#include "arcane/parallel/mpi/MpiLock.h"
38#include "arcane/parallel/mpi/MpiSerializeMessage.h"
39#include "arcane/parallel/mpi/MpiParallelNonBlockingCollective.h"
40#include "arcane/parallel/mpi/MpiDatatype.h"
41#include "arcane/parallel/mpi/IVariableSynchronizerMpiCommunicator.h"
43#include "arcane/impl/ParallelReplication.h"
44#include "arcane/impl/SequentialParallelMng.h"
45#include "arcane/impl/ParallelMngUtilsFactoryBase.h"
46#include "arcane/impl/internal/VariableSynchronizer.h"
48#include "arccore/message_passing_mpi/MpiMessagePassingMng.h"
49#include "arccore/message_passing_mpi/MpiRequestList.h"
50#include "arccore/message_passing_mpi/MpiSerializeDispatcher.h"
51#include "arccore/message_passing/Dispatchers.h"
53#include "arccore/message_passing/SerializeMessageList.h"
62using namespace Arccore::MessagePassing::Mpi;
70#if defined(ARCANE_HAS_MPI_NEIGHBOR)
88MpiParallelMngBuildInfo::
91, comm_rank(A_NULL_RANK)
98, is_mpi_comm_owned(
true)
101 ::MPI_Comm_rank(
comm,&comm_rank);
102 ::MPI_Comm_size(
comm,&comm_nb_rank);
105 MP::Mpi::MpiMessagePassingMng::BuildInfo
bi(comm_rank,comm_nb_rank,m_dispatchers_ref.get(),mpi_comm);
123 : m_mpi_parallel_mng(pm){}
126 _checkFreeCommunicator();
130 return m_topology_communicator;
146 _checkFreeCommunicator();
194 void _checkFreeCommunicator()
224 VariableSynchronizer::compute();
226 if (m_topology_info.get())
227 m_topology_info->compute(
this);
241 : m_synchronizer_version(2)
243 if (platform::getEnvironmentVariable(
"ARCANE_SYNCHRONIZE_VERSION")==
"1")
244 m_synchronizer_version = 1;
245 if (platform::getEnvironmentVariable(
"ARCANE_SYNCHRONIZE_VERSION")==
"2")
246 m_synchronizer_version = 2;
247 if (platform::getEnvironmentVariable(
"ARCANE_SYNCHRONIZE_VERSION")==
"3")
248 m_synchronizer_version = 3;
249 if (platform::getEnvironmentVariable(
"ARCANE_SYNCHRONIZE_VERSION")==
"4"){
250 m_synchronizer_version = 4;
251 String v = platform::getEnvironmentVariable(
"ARCANE_SYNCHRONIZE_BLOCK_SIZE");
253 Int32 block_size = 0;
254 if (!builtInGetValue(block_size,v))
255 m_synchronize_block_size = block_size;
256 m_synchronize_block_size = std::clamp(m_synchronize_block_size,0,1000000000);
258 v = platform::getEnvironmentVariable(
"ARCANE_SYNCHRONIZE_NB_SEQUENCE");
263 m_synchronize_nb_sequence = std::clamp(m_synchronize_nb_sequence,1,1024*1024);
266 if (platform::getEnvironmentVariable(
"ARCANE_SYNCHRONIZE_VERSION")==
"5")
267 m_synchronizer_version = 5;
273 return _createSynchronizer(pm,family->
allItems());
278 return _createSynchronizer(pm,group);
292 if (m_synchronizer_version == 2){
294 tm->
info() <<
"Using MpiSynchronizer V2";
297 else if (m_synchronizer_version == 3 ){
299 tm->
info() <<
"Using MpiSynchronizer V3";
302 else if (m_synchronizer_version == 4){
304 tm->
info() <<
"Using MpiSynchronizer V4 block_size=" << m_synchronize_block_size
305 <<
" nb_sequence=" << m_synchronize_nb_sequence;
306 generic_factory = arcaneCreateMpiBlockVariableSynchronizerFactory(
mpi_pm,m_synchronize_block_size,m_synchronize_nb_sequence);
308 else if (m_synchronizer_version == 5){
310 tm->
info() <<
"Using MpiSynchronizer V5";
312#if defined(ARCANE_HAS_MPI_NEIGHBOR)
315 throw NotSupportedException(A_FUNCINFO,
"Synchronize implementation V5 is not supported with this version of MPI");
320 tm->
info() <<
"Using MpiSynchronizer V1";
330 Integer m_synchronizer_version = 1;
331 Int32 m_synchronize_block_size = 32000;
332 Int32 m_synchronize_nb_sequence = 1;
344, m_trace(
bi.trace_mng)
345, m_thread_mng(
bi.thread_mng)
346, m_world_parallel_mng(
bi.world_parallel_mng)
347, m_timer_mng(
bi.timer_mng)
349, m_is_parallel(
bi.is_parallel)
350, m_comm_rank(
bi.commRank())
351, m_comm_size(
bi.commSize())
353, m_communicator(
bi.mpiComm())
354, m_is_communicator_owned(
bi.is_mpi_comm_owned)
355, m_mpi_lock(
bi.mpi_lock)
356, m_non_blocking_collective(
nullptr)
359 if (!m_world_parallel_mng){
360 m_trace->debug()<<
"[MpiParallelMng] No m_world_parallel_mng found, reverting to ourselves!";
361 m_world_parallel_mng =
this;
371 delete m_non_blocking_collective;
372 m_sequential_parallel_mng.reset();
373 if (m_is_communicator_owned){
377 delete m_replication;
379 if (m_is_timer_owned)
382 delete m_datatype_list;
416class ControlDispatcherDecorator
426 return m_adapter->commSplit(
keep);
428 MP::IProfiler* profiler()
const override {
return m_adapter->profiler(); }
429 void setProfiler(
MP::IProfiler*
p)
override { m_adapter->setProfiler(
p); }
445 m_is_timer_owned =
true;
454 m_sequential_parallel_mng = arcaneCreateSequentialParallelMngRef(
bi);
460 if (platform::getEnvironmentVariable(
"ARCANE_ORDERED_REDUCE")==
"TRUE")
466 MpiAdapter* adapter =
new MpiAdapter(m_trace,m_stat->toArccoreStat(),m_communicator,m_mpi_lock);
468 auto mpm = _messagePassingMng();
479 DispatchCreator creator(m_trace,
mpm,m_adapter,m_datatype_list);
480 this->createDispatchers(creator);
482 m_io_mng = arcaneCreateIOMng(
this);
485 m_non_blocking_collective->
build();
487 m_trace->
info() <<
"Using mpi with locks.";
491 if (platform::getEnvironmentVariable(
"ARCANE_SYNCHRONIZE_LIST_VERSION") ==
"1") {
492 m_use_serialize_list_v2 =
false;
493 m_trace->
info() <<
"Using MPI SerializeList version 1";
508 m_trace->
warning() <<
"MpiParallelMng already initialized";
512 m_trace->
info() <<
"Initialisation de MpiParallelMng";
513 m_sequential_parallel_mng->initialize();
529 m_mpi_serialize_dispatcher->legacySendSerializer(s,{
MessageRank(rank),
mpi_tag,Blocking});
551 return m_mpi_serialize_dispatcher->legacySendSerializer(s,{
MessageRank(rank),
mpi_tag,NonBlocking});
561 m_mpi_serialize_dispatcher->broadcastSerializer(values,
MessageRank(rank));
573 m_mpi_serialize_dispatcher->legacyReceiveSerializer(values,
MessageRank(rank),
mpi_tag);
592 return m_adapter->probeMessage(message);
601 return m_adapter->legacyProbeMessage(message);
628 for( Integer i=0, is=requests.size(); i<is; ++i )
629 m_adapter->freeRequest(requests[i]);
636_checkFinishedSubRequests()
638 m_mpi_serialize_dispatcher->checkFinishedSubRequests();
645sequentialParallelMngRef()
647 return m_sequential_parallel_mng;
653 return m_sequential_parallel_mng.get();
663 m_stat->
print(m_trace);
673 m_adapter->barrier();
682 m_adapter->waitAllRequests(requests);
683 _checkFinishedSubRequests();
692 return _waitSomeRequests(requests,
false);
701 return _waitSomeRequests(requests,
true);
714 for (
int i = 0 ; i < requests.size() ; i++) {
725_createSerializeMessageList()
727 if (m_use_serialize_list_v2)
738 return m_utils_factory->createGetVariablesValuesOperation(
this)._release();
747 return m_utils_factory->createTransferValuesOperation(
this)._release();
756 return m_utils_factory->createExchanger(
this)._release();
765 return m_utils_factory->createSynchronizer(
this,family)._release();
774 return m_utils_factory->createSynchronizer(
this,group)._release();
783 return m_utils_factory->createTopology(
this)._release();
792 return m_replication;
801 delete m_replication;
821 bi.timer_mng = m_timer_mng;
822 bi.thread_mng = m_thread_mng;
823 bi.trace_mng = m_trace;
824 bi.world_parallel_mng = m_world_parallel_mng;
825 bi.mpi_lock = m_mpi_lock;
836_createSubParallelMngRef(
Int32 color,
Int32 key)
882 :
Base(pm->m_adapter), m_parallel_mng(pm){}
887 m_parallel_mng->_checkFinishedSubRequests();
908 return m_utils_factory;
915_isAcceleratorAware()
const
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Liste des fonctions d'échange de message.
Tableau d'items de types quelconques.
Opérations pour accéder aux valeurs de variables d'un autre sous-domaine.
Interface du gestionnaire des entrées sorties.
Interface d'une famille d'entités.
virtual ItemGroup allItems() const =0
Groupe de toutes les entités.
Echange d'informations entre processeurs.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual ITraceMng * traceMng() const =0
Gestionnaire de traces.
virtual Parallel::Communicator communicator() const =0
Communicateur MPI associé à ce gestionnaire.
virtual void build()=0
Construit l'instance.
Informations sur la réplication des sous-domaines en parallèle.
Informations sur la topologie d'allocation des coeurs de calcul.
Envoie de valeurs sur différents processeurs.
Interface d'un communicateur MPI spécifique pour les synchronisations.
Interface d'un service de synchronisation de variable.
Groupe d'entités de maillage.
eItemKind itemKind() const
Genre du groupe. Il s'agit du genre de ses éléments.
bool isAllItems() const
Indique si le groupe est celui de toutes les entités.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Gère les MPI_Datatype associées aux types Arcane.
Ref< IVariableSynchronizer > createSynchronizer(IParallelMng *pm, IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
Ref< IVariableSynchronizer > createSynchronizer(IParallelMng *pm, const ItemGroup &group) override
Retourne une interface pour synchroniser des variables sur le groupe group.
Spécialisation de MpiRequestList pour MpiParallelMng.
void _wait(Parallel::eWaitType wait_type) override
Effectue l'attente ou le test.
Gestionnaire du parallélisme utilisant MPI.
IParallelMng * worldParallelMng() const override
Gestionnaire de parallélisme sur l'ensemble des ressources allouées.
MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
void barrier() override
Effectue une barière.
UniqueArray< Integer > waitSomeRequests(ArrayView< Request > requests) override
Bloque en attendant qu'une des requêtes rvalues soit terminée.
void waitAllRequests(ArrayView< Request > requests) override
Bloque en attendant que les requêtes rvalues soient terminées.
MessageId probe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
void build() override
Construit l'instance.
void printStats() override
Affiche des statistiques liées à ce gestionnaire du parallélisme.
bool m_is_initialized
true si déjà initialisé
IParallelMng * sequentialParallelMng() override
Retourne un gestionnaire de parallélisme séquentiel.
IThreadMng * threadMng() const override
Gestionnaire de threads.
ITimerMng * timerMng() const override
Gestionnaire de timers.
void initialize() override
Initialise le gestionnaire du parallélisme.
IVariableSynchronizer * createSynchronizer(IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
ISerializeMessage * createSendSerializer(Int32 rank) override
Créé un message non bloquant pour envoyer des données sérialisées au rang rank.
bool isParallel() const override
Retourne true si l'exécution est parallèle.
Ref< IParallelMngUtilsFactory > _internalUtilsFactory() const override
Fabrique des fonctions utilitaires.
ITraceMng * traceMng() const override
Gestionnaire de traces.
IParallelTopology * createTopology() override
Créé une instance contenant les infos sur la topologie des rangs de ce gestionnnaire.
Int32 m_comm_rank
Numéro du processeur actuel.
Communicator communicator() const override
Communicateur MPI associé à ce gestionnaire.
IParallelExchanger * createExchanger() override
Retourne une interface pour transférer des messages entre processeurs.
IParallelReplication * replication() const override
Informations sur la réplication.
void setReplication(IParallelReplication *v) override
Positionne les Informations sur la réplication.
Ref< Parallel::IRequestList > createRequestListRef() override
Créé une liste de requêtes pour ce gestionnaire.
ITransferValuesParallelOperation * createTransferValuesOperation() override
Retourne une opération pour transférer des valeurs entre sous-domaine.
UniqueArray< Integer > testSomeRequests(ArrayView< Request > requests) override
Test si une des requêtes rvalues est terminée.
IGetVariablesValuesParallelOperation * createGetVariablesValuesOperation() override
Retourne une opération pour récupérer les valeurs d'une variable sur les entités d'un autre sous-doma...
ISerializeMessage * createReceiveSerializer(Int32 rank) override
Créé un message non bloquant pour recevoir des données sérialisées du rang rank.
void freeRequests(ArrayView< Parallel::Request > requests) override
Libère les requêtes.
Gestionnaire du parallélisme utilisant MPI.
Gestionnaire de timer utisant la bibliothèque MPI.
Synchronizer spécifique MPI.
void compute() override
Recalcule les infos de synchronisation.
Implémentation de Arccore::MessagePassing::IControlDispatcher.
Redirige la gestion des messages des sous-domaines suivant le type de l'argument.
IMessagePassingMng * messagePassingMng() const override
Gestionnaire de message de Arccore associé
ITimeMetricCollector * timeMetricCollector() const override
Collecteur Arccore des statistiques temporelles (peut être nul)
ITimeStats * timeStats() const override
Gestionnaire de statistiques associé (peut être nul)
Classe de base d'une fabrique pour les fonctions utilitaires de IParallelMng.
Informations sur la réplication des sous-domaines en parallèle.
virtual void print(ITraceMng *trace)=0
Imprime sur trace les statistiques.
Message utilisant un SerializeBuffer.
Positionne la phase de l'action en cours d'exécution.
Communicateur spécifique créé via MPI_Dist_graph_create_adjacent.
MPI_Comm communicator() const override
Récupère le communicateur spécifique de la topologie.
void compute(VariableSynchronizer *var_syncer) override
Calcul le communicateur spécifique.
Interface d'un service de synchronisation de variable.
Vue constante d'un tableau de type T.
Interface d'un sérialiseur.
Interface du gestionnaire de traces.
virtual TraceMessage warning()=0
Flot pour un message d'avertissement.
virtual TraceMessage info()=0
Flot pour un message d'information.
virtual void flush()=0
Flush tous les flots.
Interface du gestionnaire des échanges de messages.
Interface d'un profiler pour les échanges de messages.
Interface d'une liste de messages de sérialisation.
Interface d'un message de sérialisation entre IMessagePassingMng.
Informations sur la source d'un message.
void destroy()
Détruit l'instance. Elle ne doit plus être utilisée par la suite.
Encapsulation d'un MPI_Datatype.
void _wait(eWaitType wait_type) override
Effectue l'attente ou le test.
Request receiveSerializer(ISerializer *s, const PointToPointMessageInfo &message) override
Message de réception.
Request sendSerializer(const ISerializer *s, const PointToPointMessageInfo &message) override
Message d'envoi.
Implémentation MPI de la gestion des 'ISerializeMessage'.
Informations pour envoyer/recevoir un message point à point.
static MessageTag defaultTag()
Tag par défaut pour les messages de sérialisation.
Exception lorsqu'une opération n'est pas supportée.
Chaîne de caractères unicode.
bool null() const
Retourne true si la chaîne est nulle.
TraceMessage info() const
Flot pour un message d'information.
Positionne une classe de message.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
ARCANE_MPI_EXPORT bool arcaneIsAcceleratorAwareMPI()
Indique si le runtime actuel de MPI a le support des accélérateurs.
@ IK_Cell
Entité de maillage de genre maille.
void arcaneCallFunctionAndTerminateIfThrow(std::function< void()> function)
Int32 Integer
Type représentant un entier.
Ref< TrueType > createRef(Args &&... args)
Créé une instance de type TrueType avec les arguments Args et retourne une référence dessus.
Infos pour construire un MpiParallelMng.
Infos pour construire un SequentialParallelMng.