14#include "arcane/parallel/mpithread/HybridParallelMng.h"
16#include "arcane/utils/NotImplementedException.h"
17#include "arcane/utils/NotSupportedException.h"
18#include "arcane/utils/FatalErrorException.h"
19#include "arcane/utils/NumericTypes.h"
20#include "arcane/utils/ArgumentException.h"
21#include "arcane/utils/IThreadBarrier.h"
22#include "arcane/utils/ITraceMng.h"
24#include "arcane/core/parallel/IStat.h"
26#include "arcane/parallel/mpithread/HybridParallelDispatch.h"
27#include "arcane/parallel/mpithread/HybridMessageQueue.h"
28#include "arcane/parallel/mpi/MpiParallelMng.h"
30#include "arcane/core/SerializeMessage.h"
31#include "arcane/core/IIOMng.h"
32#include "arcane/core/Timer.h"
33#include "arcane/core/ISerializeMessageList.h"
34#include "arcane/core/IItemFamily.h"
35#include "arcane/core/internal/IParallelMngInternal.h"
37#include "arcane/impl/TimerMng.h"
38#include "arcane/impl/ParallelReplication.h"
39#include "arcane/impl/SequentialParallelMng.h"
40#include "arcane/impl/ParallelMngUtilsFactoryBase.h"
43#include "arccore/message_passing/RequestListBase.h"
44#include "arccore/message_passing/SerializeMessageList.h"
52arcaneCreateIOMng(IParallelMng* psm);
71 : m_message(message), m_request(request){}
80 : m_parallel_mng(
mpm), m_trace(
mpm->traceMng())
88 m_messages_to_process.add(
msg);
98 case Parallel::WaitAll:
101 _wait(Parallel::WaitAll);
102 m_messages_to_process.clear();
104 case Parallel::WaitSome:
106 case Parallel::WaitSomeNonBlocking:
125void HybridSerializeMessageList::
128 m_trace->
info() <<
"BEGIN PROCESS MESSAGES";
132 HybridMessageQueue* message_queue = m_parallel_mng->m_message_queue;
144 r = message_queue->addReceive(
message_info,ReceiveBufferInfo(s));
148 if (wait_mode==Parallel::WaitAll)
149 message_queue->waitAll(all_requests);
151 for( ISerializeMessage* sm : messages )
152 sm->setFinished(true);
162HybridParallelMng(
const HybridParallelMngBuildInfo& bi)
163: ParallelMngDispatcher(ParallelMngDispatcherBuildInfo(bi.local_rank,bi.local_nb_rank))
164, m_trace(bi.trace_mng)
165, m_thread_mng(bi.thread_mng)
166, m_world_parallel_mng(bi.world_parallel_mng)
168, m_timer_mng(nullptr)
169, m_replication(new ParallelReplication())
170, m_message_queue(new HybridMessageQueue(bi.message_queue,bi.mpi_parallel_mng,bi.local_nb_rank))
171, m_is_initialized(false)
173, m_thread_barrier(bi.thread_barrier)
174, m_mpi_parallel_mng(bi.mpi_parallel_mng)
175, m_all_dispatchers(bi.all_dispatchers)
176, m_sub_builder_factory(bi.sub_builder_factory)
177, m_parent_container_ref(bi.container)
178, m_utils_factory(
createRef<ParallelMngUtilsFactoryBase>())
180 if (!m_world_parallel_mng)
181 m_world_parallel_mng =
this;
185 m_local_rank = bi.local_rank;
186 m_local_nb_rank = bi.local_nb_rank;
188 Int32 mpi_rank = m_mpi_parallel_mng->commRank();
189 Int32 mpi_size = m_mpi_parallel_mng->commSize();
191 m_global_rank = m_local_rank + mpi_rank * m_local_nb_rank;
192 m_global_nb_rank = mpi_size * m_local_nb_rank;
194 m_is_parallel = m_global_nb_rank!=1;
203 m_sequential_parallel_mng.reset();
204 delete m_replication;
206 delete m_message_queue;
209 delete m_mpi_parallel_mng;
221 DispatchCreator(ITraceMng* tm,HybridParallelMng* mpm,HybridMessageQueue* message_queue,MpiThreadAllDispatcher* all_dispatchers)
222 : m_tm(tm), m_mpm(mpm), m_message_queue(message_queue), m_all_dispatchers(all_dispatchers){}
224 template<
typename DataType> HybridParallelDispatch<DataType>*
227 HybridMessageQueue* tmq = m_message_queue;
228 MpiThreadAllDispatcher* ad = m_all_dispatchers;
229 auto field = ad->instance((DataType*)
nullptr).view();
230 return new HybridParallelDispatch<DataType>(m_tm,m_mpm,tmq,field);
234 HybridParallelMng* m_mpm;
235 HybridMessageQueue* m_message_queue;
236 MpiThreadAllDispatcher* m_all_dispatchers;
247 tm->
info() <<
"Initialise HybridParallelMng"
250 <<
" mpi_rank=" << m_mpi_parallel_mng->
commRank();
260 m_sequential_parallel_mng = arcaneCreateSequentialParallelMngRef(
bi);
263 DispatchCreator creator(m_trace,
this,m_message_queue,m_all_dispatchers);
264 this->createDispatchers(creator);
265 m_io_mng = arcaneCreateIOMng(
this);
279 m_trace->
warning() <<
"HybridParallelMng already initialized";
304 return m_utils_factory->createGetVariablesValuesOperation(
this)._release();
310 return m_utils_factory->createTransferValuesOperation(
this)._release();
316 return m_utils_factory->createExchanger(
this)._release();
325void HybridParallelMng::
336auto HybridParallelMng::
339 ARCANE_UNUSED(bytes);
340 auto p2p_message = buildMessage(rank,Parallel::NonBlocking);
356void HybridParallelMng::
382 sbuf->setFromSizes();
389void HybridParallelMng::
390recvSerializer(ISerializer* s,
Int32 rank)
392 auto p2p_message =
buildMessage(rank,Parallel::NonBlocking);
393 Request r = m_message_queue->addReceive(p2p_message,ReceiveBufferInfo(s));
394 m_message_queue->waitAll(ArrayView<Request>(1,&r));
412 ARCANE_UNUSED(requests);
455 return m_message_queue->addReceive(
p2p_message,ReceiveBufferInfo(s));
465 m_stat->
print(m_trace);
474 m_thread_barrier->
wait();
477 m_thread_barrier->
wait();
484_createSerializeMessageList()
487 x->setAllowAnyRankReceive(
false);
497 return m_utils_factory->createSynchronizer(
this,family)._release();
506 return m_utils_factory->createSynchronizer(
this,group)._release();
515 return m_utils_factory->createTopology(
this)._release();
524 return m_replication;
533 delete m_replication;
543 return m_sequential_parallel_mng.get();
550sequentialParallelMngRef()
552 return m_sequential_parallel_mng;
566 : m_parallel_mng(pm), m_message_queue(pm->m_message_queue),
567 m_local_rank(m_parallel_mng->localRank()) {}
572 case Parallel::WaitAll:
573 m_parallel_mng->m_message_queue->waitAll(_requests());
575 case Parallel::WaitSome:
576 m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),
false);
578 case Parallel::WaitSomeNonBlocking:
579 m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),
true);
604 m_message_queue->waitAll(requests);
668 m_trace->
info() <<
"CREATE SUB_PARALLEL_MNG_REF";
767 ARCANE_FATAL(
"Not same number of new local ranks on every MPI processus: current={0} max={1}",
774 m_thread_barrier->
wait();
788 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder = builder;
791 m_thread_barrier->
wait();
793 builder = m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder;
800 m_thread_barrier->
wait();
806 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder.reset();
808 m_thread_barrier->
wait();
819 return m_utils_factory;
825bool HybridParallelMng::
826_isAcceleratorAware()
const
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Liste des fonctions d'échange de message.
Tableau d'items de types quelconques.
Opérations pour accéder aux valeurs de variables d'un autre sous-domaine.
Interface d'une famille d'entités.
Echange d'informations entre processeurs.
virtual Ref< IParallelMngContainer > _createParallelMngBuilder(Int32 nb_local_rank, Parallel::Communicator communicator)=0
Créé un conteneur pour nb_local_rank rangs locaux et avec comme communicateur communicator.
virtual bool isAcceleratorAware() const =0
Indique si l'implémentation gère les accélérateurs.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual void computeMinMaxSum(char val, char &min_val, char &max_val, char &sum_val, Int32 &min_rank, Int32 &max_rank)=0
Calcule en une opération la somme, le min, le max d'une valeur.
Informations sur la réplication des sous-domaines en parallèle.
Informations sur la topologie d'allocation des coeurs de calcul.
Envoie de valeurs sur différents processeurs.
Interface d'un service de synchronisation de variable.
Groupe d'entités de maillage.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Interface d'une file de messages avec les threads.
Implémentation de IRequestList pour HybridParallelMng.
void _wait(Parallel::eWaitType wait_type) override
Effectue l'attente ou le test.
Gestionnaire du parallélisme utilisant les threads.
IParallelMng * worldParallelMng() const override
Gestionnaire de parallélisme sur l'ensemble des ressources allouées.
void freeRequests(ArrayView< Request > requests) override
Libère les requêtes.
void printStats() override
Affiche des statistiques liées à ce gestionnaire du parallélisme.
IParallelExchanger * createExchanger() override
Retourne une interface pour transférer des messages entre processeurs.
void barrier() override
Effectue une barière.
MP::Communicator communicator() const override
Communicateur MPI associé à ce gestionnaire.
bool m_is_initialized
true si déjà initialisé
ISerializeMessage * createReceiveSerializer(Int32 rank) override
Créé un message non bloquant pour recevoir des données sérialisées du rang rank.
Ref< Parallel::IRequestList > createRequestListRef() override
Créé une liste de requêtes pour ce gestionnaire.
ITimerMng * timerMng() const override
Gestionnaire de timers.
void build() override
Construit l'instance.
void waitAllRequests(ArrayView< Request > requests) override
Bloque en attendant que les requêtes rvalues soient terminées.
void initialize() override
Initialise le gestionnaire du parallélisme.
IThreadMng * threadMng() const override
Gestionnaire de threads.
MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
Int32 m_local_rank
Rang local du processeur actuel.
ITraceMng * traceMng() const override
Gestionnaire de traces.
IGetVariablesValuesParallelOperation * createGetVariablesValuesOperation() override
Retourne une opération pour récupérer les valeurs d'une variable sur les entités d'un autre sous-doma...
void * getMPICommunicator() override
Adresse du communicateur MPI associé à ce gestionnaire.
IParallelReplication * replication() const override
Informations sur la réplication.
Ref< IParallelMngUtilsFactory > _internalUtilsFactory() const override
Fabrique des fonctions utilitaires.
Ref< IParallelMng > createSubParallelMngRef(Int32ConstArrayView kept_ranks) override
Créé un nouveau gestionnaire de parallélisme pour un sous-ensemble des rangs.
ITransferValuesParallelOperation * createTransferValuesOperation() override
Retourne une opération pour transférer des valeurs entre sous-domaine.
IParallelTopology * createTopology() override
Créé une instance contenant les infos sur la topologie des rangs de ce gestionnnaire.
void setReplication(IParallelReplication *v) override
Positionne les Informations sur la réplication.
IParallelMng * sequentialParallelMng() override
Retourne un gestionnaire de parallélisme séquentiel.
IVariableSynchronizer * createSynchronizer(IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
Int32 m_local_nb_rank
Nombre de rang locaux.
MessageId probe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
ISerializeMessage * createSendSerializer(Int32 rank) override
Créé un message non bloquant pour envoyer des données sérialisées au rang rank.
Int32 commRank() const override
Rang de cette instance dans le communicateur.
Int32 m_global_rank
Numéro du processeur actuel.
PointToPointMessageInfo buildMessage(Int32 dest, MP::eBlockingType is_blocking)
Construit un message avec pour destinataire dest.
void processPendingMessages() override
Envoie les messages de la liste qui ne l'ont pas encore été.
Integer waitMessages(Parallel::eWaitType wait_type) override
Attend que les messages aient terminé leur exécution.
void addMessage(ISerializeMessage *msg) override
Ajoute un message à la liste.
void barrier() override
Effectue une barière.
Int32 commRank() const override
Rang de cette instance dans le communicateur.
void * getMPICommunicator() override
Adresse du communicateur MPI associé à ce gestionnaire.
Communicator communicator() const override
Communicateur MPI associé à ce gestionnaire.
Int32 commSize() const override
Nombre d'instance dans le communicateur.
void allGather(ISerializer *send_serializer, ISerializer *recv_serializer) override
Redéfinit ici allGather pour éviter de cacher le symbole dans les classes dérivées.
Ref< IParallelMng > createSubParallelMngRef(Int32ConstArrayView kept_ranks) override
Créé un nouveau gestionnaire de parallélisme pour un sous-ensemble des rangs.
IParallelMngInternal * _internalApi() override
API interne à Arcane.
IMessagePassingMng * messagePassingMng() const override
Gestionnaire de message de Arccore associé
ITimeStats * timeStats() const override
Gestionnaire de statistiques associé (peut être nul)
virtual void print(ITraceMng *trace)=0
Imprime sur trace les statistiques.
Implémentation d'un tampon pour la sérialisation.
Message utilisant un SerializeBuffer.
Positionne la phase de l'action en cours d'exécution.
Exception lorsqu'un argument est invalide.
Vue constante d'un tableau de type T.
Interface d'un sérialiseur.
virtual bool wait()=0
Bloque et attend que tous les threads appellent cette méthode.
Interface du gestionnaire de traces.
virtual TraceMessage warning()=0
Flot pour un message d'avertissement.
virtual TraceMessage info()=0
Flot pour un message d'information.
Communicateur pour l'échange de message.
Interface du gestionnaire des échanges de messages.
Liste de requête de messages.
Interface d'une liste de messages de sérialisation.
Interface d'un message de sérialisation entre IMessagePassingMng.
Informations sur la source d'un message.
Informations pour envoyer/recevoir un message point à point.
Classe de base d'une liste de requêtes.
Exception lorsqu'une fonction n'est pas implémentée.
Exception lorsqu'une opération n'est pas supportée.
TraceMessage info() const
Flot pour un message d'information.
Positionne une classe de message.
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
IStat * createDefaultStat()
Créé une instance par défaut.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
ArrayView< Int64 > Int64ArrayView
Equivalent C d'un tableau à une dimension d'entiers 64 bits.
eBlockingType
Type indiquant si un message est bloquant ou non.
Ref< TrueType > createRef(Args &&... args)
Créé une instance de type TrueType avec les arguments Args et retourne une référence dessus.
Infos pour construire un SequentialParallelMng.