14#include "arcane/parallel/mpithread/HybridParallelMng.h"
16#include "arcane/utils/NotImplementedException.h"
17#include "arcane/utils/NotSupportedException.h"
18#include "arcane/utils/FatalErrorException.h"
19#include "arcane/utils/NumericTypes.h"
20#include "arcane/utils/ArgumentException.h"
21#include "arcane/utils/IThreadBarrier.h"
22#include "arcane/utils/ITraceMng.h"
24#include "arcane/core/IIOMng.h"
25#include "arcane/core/Timer.h"
26#include "arcane/core/ISerializeMessageList.h"
27#include "arcane/core/IItemFamily.h"
28#include "arcane/core/internal/IParallelMngInternal.h"
29#include "arcane/core/internal/SerializeMessage.h"
30#include "arcane/core/parallel/IStat.h"
32#include "arcane/parallel/mpithread/HybridParallelDispatch.h"
33#include "arcane/parallel/mpithread/HybridMessageQueue.h"
34#include "arcane/parallel/mpi/MpiParallelMng.h"
36#include "arcane/impl/TimerMng.h"
37#include "arcane/impl/ParallelReplication.h"
38#include "arcane/impl/SequentialParallelMng.h"
39#include "arcane/impl/internal/ParallelMngUtilsFactoryBase.h"
42#include "arccore/message_passing/RequestListBase.h"
43#include "arccore/message_passing/SerializeMessageList.h"
62class HybridSerializeMessageList
66 class HybridSerializeMessageRequest
70 : m_message(message), m_request(request){}
79 : m_parallel_mng(mpm), m_trace(mpm->traceMng())
87 m_messages_to_process.add(msg);
97 case Parallel::WaitAll:
100 _wait(Parallel::WaitAll);
101 m_messages_to_process.clear();
124void HybridSerializeMessageList::
127 m_trace->
info() <<
"BEGIN PROCESS MESSAGES";
147 if (wait_mode==Parallel::WaitAll)
148 message_queue->waitAll(all_requests);
150 for( ISerializeMessage* sm : messages )
151 sm->setFinished(
true);
162: ParallelMngDispatcher(ParallelMngDispatcherBuildInfo(bi.local_rank,bi.local_nb_rank))
163, m_trace(bi.trace_mng)
164, m_thread_mng(bi.thread_mng)
165, m_world_parallel_mng(bi.world_parallel_mng)
167, m_timer_mng(nullptr)
168, m_replication(new ParallelReplication())
169, m_message_queue(new
HybridMessageQueue(bi.message_queue,bi.mpi_parallel_mng,bi.local_nb_rank))
170, m_is_initialized(false)
172, m_thread_barrier(bi.thread_barrier)
173, m_mpi_parallel_mng(bi.mpi_parallel_mng)
174, m_all_dispatchers(bi.all_dispatchers)
175, m_sub_builder_factory(bi.sub_builder_factory)
176, m_parent_container_ref(bi.container)
177, m_utils_factory(
createRef<ParallelMngUtilsFactoryBase>())
179 if (!m_world_parallel_mng)
180 m_world_parallel_mng =
this;
184 m_local_rank = bi.local_rank;
185 m_local_nb_rank = bi.local_nb_rank;
187 Int32 mpi_rank = m_mpi_parallel_mng->commRank();
188 Int32 mpi_size = m_mpi_parallel_mng->commSize();
190 m_global_rank = m_local_rank + mpi_rank * m_local_nb_rank;
191 m_global_nb_rank = mpi_size * m_local_nb_rank;
193 m_is_parallel = m_global_nb_rank!=1;
202 m_sequential_parallel_mng.reset();
203 delete m_replication;
205 delete m_message_queue;
208 delete m_mpi_parallel_mng;
220 DispatchCreator(ITraceMng* tm,HybridParallelMng* mpm,HybridMessageQueue* message_queue,MpiThreadAllDispatcher* all_dispatchers)
221 : m_tm(tm), m_mpm(mpm), m_message_queue(message_queue), m_all_dispatchers(all_dispatchers){}
223 template<
typename DataType> HybridParallelDispatch<DataType>*
226 HybridMessageQueue* tmq = m_message_queue;
227 MpiThreadAllDispatcher* ad = m_all_dispatchers;
228 auto field = ad->instance((DataType*)
nullptr).view();
229 return new HybridParallelDispatch<DataType>(m_tm,m_mpm,tmq,field);
233 HybridParallelMng* m_mpm;
234 HybridMessageQueue* m_message_queue;
235 MpiThreadAllDispatcher* m_all_dispatchers;
246 tm->
info() <<
"Initialise HybridParallelMng"
249 <<
" mpi_rank=" << m_mpi_parallel_mng->commRank();
259 m_sequential_parallel_mng = arcaneCreateSequentialParallelMngRef(bi);
262 DispatchCreator creator(m_trace,
this,m_message_queue,m_all_dispatchers);
263 this->createDispatchers(creator);
264 m_io_mng = arcaneCreateIOMng(
this);
278 m_trace->warning() <<
"HybridParallelMng already initialized";
303 return m_utils_factory->createGetVariablesValuesOperation(
this)._release();
309 return m_utils_factory->createTransferValuesOperation(
this)._release();
315 return m_utils_factory->createExchanger(
this)._release();
324void HybridParallelMng::
327 auto p2p_message =
buildMessage(rank,Parallel::NonBlocking);
328 Request r = m_message_queue->addSend(p2p_message,s);
335auto HybridParallelMng::
338 ARCANE_UNUSED(bytes);
339 auto p2p_message =
buildMessage(rank,Parallel::NonBlocking);
340 return m_message_queue->addSend(p2p_message,s);
349 return m_utils_factory->createSendSerializeMessage(
this, rank)._release();
355void HybridParallelMng::
361 bool is_broadcaster = (rank==
commRank());
370 Int64 total_size = sbuf->totalSize();
376 Int64 total_size = 0;
378 sbuf->preallocate(total_size);
381 sbuf->setFromSizes();
388void HybridParallelMng::
389recvSerializer(ISerializer* s,
Int32 rank)
391 auto p2p_message =
buildMessage(rank,Parallel::NonBlocking);
392 Request r = m_message_queue->addReceive(p2p_message,ReceiveBufferInfo(s));
393 m_message_queue->waitAll(ArrayView<Request>(1,&r));
402 return m_utils_factory->createReceiveSerializeMessage(
this, rank)._release();
411 ARCANE_UNUSED(requests);
423 return m_message_queue->probe(p2p_message);
434 return m_message_queue->legacyProbe(p2p_message);
444 return m_message_queue->addSend(p2p_message,s);
464 m_stat->print(m_trace);
473 m_thread_barrier->wait();
475 m_mpi_parallel_mng->barrier();
476 m_thread_barrier->wait();
483_createSerializeMessageList()
486 x->setAllowAnyRankReceive(
false);
496 return m_utils_factory->createSynchronizer(
this,family)._release();
505 return m_utils_factory->createSynchronizer(
this,group)._release();
514 return m_utils_factory->createTopology(
this)._release();
523 return m_replication;
532 delete m_replication;
542 return m_sequential_parallel_mng.get();
549sequentialParallelMngRef()
551 return m_sequential_parallel_mng;
564 RequestList(HybridParallelMng* pm)
565 : m_parallel_mng(pm), m_message_queue(pm->m_message_queue),
566 m_local_rank(m_parallel_mng->localRank()) {}
571 case Parallel::WaitAll:
572 m_parallel_mng->m_message_queue->waitAll(_requests());
575 m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),
false);
578 m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),
true);
582 HybridParallelMng* m_parallel_mng;
603 m_message_queue->waitAll(requests);
612 return m_mpi_parallel_mng->getMPICommunicator();
621 return m_mpi_parallel_mng->communicator();
647IParallelMng* HybridParallelMng::
650 ARCANE_UNUSED(kept_ranks);
651 ARCANE_THROW(NotSupportedException,
"Use createSubParallelMngRef() instead");
663 if (kept_ranks.
empty())
667 m_trace->info() <<
"CREATE SUB_PARALLEL_MNG_REF";
687 Int32 my_new_global_rank = (-1);
688 Int32 new_local_nb_rank = 0;
689 Int32 my_new_local_rank = (-1);
690 for(
Integer i=0; i<nb_kept_rank; ++i ){
691 Int32 kept_rank = kept_ranks[i];
692 if (kept_rank>=first_global_rank_in_this_mpi && kept_rank<last_global_rank_in_this_mpi)
695 my_new_global_rank = i;
696 my_new_local_rank = new_local_nb_rank - 1;
699 bool has_new_rank = (my_new_global_rank != (-1));
710 Int32 min_new_local_nb_rank = -1;
711 Int32 max_new_local_nb_rank = -1;
712 Int32 sum_new_local_nb_rank = -1;
713 Int32 min_rank = A_NULL_RANK;
714 Int32 max_rank = A_NULL_RANK;
715 computeMinMaxSum(new_local_nb_rank,min_new_local_nb_rank,max_new_local_nb_rank,
716 sum_new_local_nb_rank,min_rank,max_rank);
718 m_trace->info() <<
"CREATE SUB_PARALLEL_MNG_REF new_local_nb_rank=" << new_local_nb_rank
719 <<
" min=" << min_new_local_nb_rank
720 <<
" max=" << max_new_local_nb_rank
721 <<
" sum=" << sum_new_local_nb_rank
722 <<
" new_global_rank=" << my_new_global_rank;
726 if (max_new_local_nb_rank==1){
727 Integer nb_mpi_rank = m_mpi_parallel_mng->commSize();
737 bool do_mpi_call =
false;
738 if (min_new_local_nb_rank==1){
741 kept_mpi_ranks.
resize(nb_mpi_rank);
742 for(
Int32 x=0; x<nb_mpi_rank; ++x )
743 kept_mpi_ranks[x] = x;
752 Int16 v = (has_new_rank) ? 1 : 0;
755 for(
Int32 x=0; x<nb_mpi_rank; ++x )
756 if (gathered_ranks[x]==1)
757 kept_mpi_ranks.
add(x);
760 return m_mpi_parallel_mng->createSubParallelMngRef(kept_mpi_ranks);
765 if (max_new_local_nb_rank!=new_local_nb_rank)
766 ARCANE_FATAL(
"Not same number of new local ranks on every MPI processus: current={0} max={1}",
767 new_local_nb_rank,max_new_local_nb_rank);
769 if (max_new_local_nb_rank<2)
770 ARCANE_FATAL(
"number of local ranks is too low current={0} minimum=2",new_local_nb_rank);
773 m_thread_barrier->wait();
785 builder = m_sub_builder_factory->_createParallelMngBuilder(new_local_nb_rank,c);
787 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder = builder;
790 m_thread_barrier->wait();
792 builder = m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder;
796 if (my_new_local_rank>=0){
797 new_parallel_mng = builder->_createParallelMng(my_new_local_rank,
traceMng());
799 m_thread_barrier->wait();
805 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder.reset();
807 m_thread_barrier->wait();
809 return new_parallel_mng;
818 return m_utils_factory;
824bool HybridParallelMng::
825_isAcceleratorAware()
const
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Liste des fonctions d'échange de message.
Exception lorsqu'un argument est invalide.
Vue modifiable d'un tableau d'un type T.
void resize(Int64 s)
Change le nombre d'éléments du tableau à s.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
Vue constante d'un tableau de type T.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
constexpr bool empty() const noexcept
true si le tableau est vide (size()==0)
Opérations pour accéder aux valeurs de variables d'un autre sous-domaine.
Interface du gestionnaire des entrées sorties.
Interface d'une famille d'entités.
Echange d'informations entre processeurs.
virtual bool isAcceleratorAware() const =0
Indique si l'implémentation gère les accélérateurs.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual void computeMinMaxSum(char val, char &min_val, char &max_val, char &sum_val, Int32 &min_rank, Int32 &max_rank)=0
Calcule en une opération la somme, le min, le max d'une valeur.
Informations sur la réplication des sous-domaines en parallèle.
Informations sur la topologie d'allocation des coeurs de calcul.
Interface d'un sérialiseur.
Interface du gestionnaire de traces.
virtual TraceMessage info()=0
Flot pour un message d'information.
Envoie de valeurs sur différents processeurs.
Interface d'un service de synchronisation de variable.
Groupe d'entités de maillage.
Interface d'une file de messages avec les threads.
Implémentation de IRequestList pour HybridParallelMng.
void _wait(Parallel::eWaitType wait_type) override
Effectue l'attente ou le test.
Gestionnaire du parallélisme utilisant les threads.
IParallelMng * worldParallelMng() const override
Gestionnaire de parallélisme sur l'ensemble des ressources allouées.
void freeRequests(ArrayView< Request > requests) override
Libère les requêtes.
void printStats() override
Affiche des statistiques liées à ce gestionnaire du parallélisme.
IParallelExchanger * createExchanger() override
Retourne une interface pour transférer des messages entre processeurs.
void barrier() override
Effectue une barière.
MP::Communicator communicator() const override
Communicateur MPI associé à ce gestionnaire.
bool m_is_initialized
true si déjà initialisé
ISerializeMessage * createReceiveSerializer(Int32 rank) override
Créé un message non bloquant pour recevoir des données sérialisées du rang rank.
Ref< Parallel::IRequestList > createRequestListRef() override
Créé une liste de requêtes pour ce gestionnaire.
ITimerMng * timerMng() const override
Gestionnaire de timers.
void build() override
Construit l'instance.
void waitAllRequests(ArrayView< Request > requests) override
Bloque en attendant que les requêtes rvalues soient terminées.
void initialize() override
Initialise le gestionnaire du parallélisme.
IThreadMng * threadMng() const override
Gestionnaire de threads.
MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
Int32 m_local_rank
Rang local du processeur actuel.
ITraceMng * traceMng() const override
Gestionnaire de traces.
IGetVariablesValuesParallelOperation * createGetVariablesValuesOperation() override
Retourne une opération pour récupérer les valeurs d'une variable sur les entités d'un autre sous-doma...
void * getMPICommunicator() override
Adresse du communicateur MPI associé à ce gestionnaire.
IParallelReplication * replication() const override
Informations sur la réplication.
Ref< IParallelMngUtilsFactory > _internalUtilsFactory() const override
Fabrique des fonctions utilitaires.
Ref< IParallelMng > createSubParallelMngRef(Int32ConstArrayView kept_ranks) override
Créé un nouveau gestionnaire de parallélisme pour un sous-ensemble des rangs.
ITransferValuesParallelOperation * createTransferValuesOperation() override
Retourne une opération pour transférer des valeurs entre sous-domaine.
IParallelTopology * createTopology() override
Créé une instance contenant les infos sur la topologie des rangs de ce gestionnnaire.
void setReplication(IParallelReplication *v) override
Positionne les Informations sur la réplication.
IParallelMng * sequentialParallelMng() override
Retourne un gestionnaire de parallélisme séquentiel.
IVariableSynchronizer * createSynchronizer(IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
Int32 m_local_nb_rank
Nombre de rang locaux.
MessageId probe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
ISerializeMessage * createSendSerializer(Int32 rank) override
Créé un message non bloquant pour envoyer des données sérialisées au rang rank.
Int32 commRank() const override
Rang de cette instance dans le communicateur.
Int32 m_global_rank
Numéro du processeur actuel.
PointToPointMessageInfo buildMessage(Int32 dest, MP::eBlockingType is_blocking)
Construit un message avec pour destinataire dest.
void processPendingMessages() override
Envoie les messages de la liste qui ne l'ont pas encore été.
Integer waitMessages(Parallel::eWaitType wait_type) override
Attend que les messages aient terminé leur exécution.
void addMessage(ISerializeMessage *msg) override
Ajoute un message à la liste.
Interface du gestionnaire des échanges de messages.
Liste de requête de messages.
Interface d'une liste de messages de sérialisation.
Interface d'un message de sérialisation entre IMessagePassingMng.
Informations sur la source d'un message.
Informations pour envoyer/recevoir un message point à point.
void setEmiterRank(MessageRank rank)
Positionne le rang de l'émetteur du message.
Informations des buffers de réception.
Informations des buffers d'envoie.
Classe de base d'une liste de requêtes.
Exception lorsqu'une fonction n'est pas implémentée.
IParallelMngInternal * _internalApi() override
API interne à Arcane.
IMessagePassingMng * messagePassingMng() const override
Gestionnaire de message de Arccore associé
ITimeStats * timeStats() const override
Gestionnaire de statistiques associé (peut être nul)
Référence à une instance.
InstanceType * get() const
Instance associée ou nullptr si aucune.
Implémentation d'un tampon pour la sérialisation.
Vue d'un tableau d'éléments de type T.
Positionne la phase de l'action en cours d'exécution.
Positionne une classe de message.
Vecteur 1D de données avec sémantique par valeur (style STL).
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
C void mpBroadcast(IMessagePassingMng *pm, Span< char > send_buf, Int32 rank)
@ WaitSome
Attend que tous les messages de la liste soient traités.
eBlockingType
Type indiquant si un message est bloquant ou non.
IStat * createDefaultStat()
Créé une instance par défaut.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
ArrayView< Int64 > Int64ArrayView
Equivalent C d'un tableau à une dimension d'entiers 64 bits.
Ref< TrueType > createRef(Args &&... args)
Créé une instance de type TrueType avec les arguments Args et retourne une référence dessus.
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
Array< Byte > ByteArray
Tableau dynamique à une dimension de caractères.
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
std::int16_t Int16
Type entier signé sur 16 bits.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int32_t Int32
Type entier signé sur 32 bits.
Infos pour construire un HybridParallelMng.
Infos pour construire un SequentialParallelMng.