14#include "arcane/parallel/mpithread/HybridParallelMng.h"
16#include "arcane/utils/NotImplementedException.h"
17#include "arcane/utils/NotSupportedException.h"
18#include "arcane/utils/FatalErrorException.h"
19#include "arcane/utils/NumericTypes.h"
20#include "arcane/utils/ArgumentException.h"
21#include "arcane/utils/IThreadBarrier.h"
22#include "arcane/utils/ITraceMng.h"
24#include "arcane/core/IIOMng.h"
25#include "arcane/core/Timer.h"
26#include "arcane/core/ISerializeMessageList.h"
27#include "arcane/core/IItemFamily.h"
28#include "arcane/core/IParallelTopology.h"
29#include "arcane/core/internal/ParallelMngInternal.h"
30#include "arcane/core/internal/SerializeMessage.h"
31#include "arcane/core/internal/MachineShMemWinMemoryAllocator.h"
32#include "arcane/core/parallel/IStat.h"
34#include "arcane/parallel/mpithread/HybridParallelDispatch.h"
35#include "arcane/parallel/mpithread/HybridMessageQueue.h"
36#include "arcane/parallel/mpithread/internal/HybridMachineShMemWinBaseInternalCreator.h"
37#include "arcane/parallel/mpithread/internal/HybridContigMachineShMemWinBaseInternal.h"
38#include "arcane/parallel/mpithread/internal/HybridMachineShMemWinBaseInternal.h"
40#include "arcane/parallel/mpi/MpiParallelMng.h"
42#include "arcane/impl/TimerMng.h"
43#include "arcane/impl/ParallelReplication.h"
44#include "arcane/impl/SequentialParallelMng.h"
45#include "arcane/impl/internal/ParallelMngUtilsFactoryBase.h"
48#include "arccore/message_passing/RequestListBase.h"
49#include "arccore/message_passing/internal/SerializeMessageList.h"
68class HybridSerializeMessageList
72 class HybridSerializeMessageRequest
76 : m_message(message), m_request(request){}
85 : m_parallel_mng(mpm), m_trace(mpm->traceMng())
93 m_messages_to_process.add(msg);
103 case Parallel::WaitAll:
106 _wait(Parallel::WaitAll);
107 m_messages_to_process.clear();
130void HybridSerializeMessageList::
133 m_trace->
info() <<
"BEGIN PROCESS MESSAGES";
153 if (wait_mode==Parallel::WaitAll)
154 message_queue->waitAll(all_requests);
156 for( ISerializeMessage* sm : messages )
157 sm->setFinished(
true);
167:
public ParallelMngInternal
172 : ParallelMngInternal(pm)
174 , m_window_creator(window_creator)
178 ~Impl()
override =
default;
184 return m_parallel_mng->m_mpi_parallel_mng->commRank() * m_parallel_mng->m_local_nb_rank;
188 return m_parallel_mng->m_local_nb_rank;
193 m_parallel_mng->traceMng()->debug() <<
"initializeWindowCreator() Hybrid";
194 m_window_creator->initializeMpiWindowCreator(m_parallel_mng->commRank(), m_parallel_mng->mpiParallelMng());
199 if (m_shmem_available == 1) {
203 if (m_shmem_available == 0) {
205 if (topo->machineRanks().size() == m_window_creator->machineRanks().size()) {
206 m_shmem_available = 1;
210 m_shmem_available = 2;
219 return makeRef(m_window_creator->createWindow(m_parallel_mng->commRank(), sizeof_segment, sizeof_type, m_parallel_mng->mpiParallelMng()));
224 return makeRef(m_window_creator->createDynamicWindow(m_parallel_mng->commRank(), sizeof_segment, sizeof_type, m_parallel_mng->mpiParallelMng()));
234 return m_window_creator->machineRanks();
239 m_window_creator->machineBarrier(m_parallel_mng->commRank(), m_parallel_mng->mpiParallelMng());
244 HybridParallelMng* m_parallel_mng;
251 Int8 m_shmem_available = 0;
263, m_trace(bi.trace_mng)
264, m_thread_mng(bi.thread_mng)
265, m_world_parallel_mng(bi.world_parallel_mng)
267, m_timer_mng(nullptr)
269, m_message_queue(new
HybridMessageQueue(bi.message_queue,bi.mpi_parallel_mng,bi.local_nb_rank))
270, m_is_initialized(false)
271, m_stat(
Parallel::createDefaultStat())
272, m_thread_barrier(bi.thread_barrier)
273, m_mpi_parallel_mng(bi.mpi_parallel_mng)
274, m_all_dispatchers(bi.all_dispatchers)
275, m_sub_builder_factory(bi.sub_builder_factory)
276, m_parent_container_ref(bi.container)
278, m_parallel_mng_internal(new Impl(this, bi.window_creator))
280 if (!m_world_parallel_mng)
281 m_world_parallel_mng =
this;
288 Int32 mpi_rank = m_mpi_parallel_mng->commRank();
289 Int32 mpi_size = m_mpi_parallel_mng->commSize();
303 m_sequential_parallel_mng.reset();
304 delete m_replication;
306 delete m_message_queue;
309 delete m_mpi_parallel_mng;
310 delete m_parallel_mng_internal;
322 DispatchCreator(ITraceMng* tm,HybridParallelMng* mpm,HybridMessageQueue* message_queue,MpiThreadAllDispatcher* all_dispatchers)
323 : m_tm(tm), m_mpm(mpm), m_message_queue(message_queue), m_all_dispatchers(all_dispatchers){}
325 template<
typename DataType> HybridParallelDispatch<DataType>*
328 HybridMessageQueue* tmq = m_message_queue;
329 MpiThreadAllDispatcher* ad = m_all_dispatchers;
330 auto field = ad->instance((DataType*)
nullptr).view();
331 return new HybridParallelDispatch<DataType>(m_tm,m_mpm,tmq,field);
335 HybridParallelMng* m_mpm;
336 HybridMessageQueue* m_message_queue;
337 MpiThreadAllDispatcher* m_all_dispatchers;
348 tm->
info() <<
"Initialise HybridParallelMng"
351 <<
" mpi_rank=" << m_mpi_parallel_mng->commRank();
361 m_sequential_parallel_mng = arcaneCreateSequentialParallelMngRef(bi);
364 DispatchCreator creator(m_trace,
this,m_message_queue,m_all_dispatchers);
365 this->createDispatchers(creator);
366 m_io_mng = arcaneCreateIOMng(
this);
368 m_parallel_mng_internal->initializeWindowCreator();
382 m_trace->warning() <<
"HybridParallelMng already initialized";
407 return m_utils_factory->createGetVariablesValuesOperation(
this)._release();
413 return m_utils_factory->createTransferValuesOperation(
this)._release();
419 return m_utils_factory->createExchanger(
this)._release();
428void HybridParallelMng::
431 auto p2p_message =
buildMessage(rank,Parallel::NonBlocking);
432 Request r = m_message_queue->addSend(p2p_message,s);
439auto HybridParallelMng::
442 ARCANE_UNUSED(bytes);
443 auto p2p_message =
buildMessage(rank,Parallel::NonBlocking);
444 return m_message_queue->addSend(p2p_message,s);
453 return m_utils_factory->createSendSerializeMessage(
this, rank)._release();
459void HybridParallelMng::
465 bool is_broadcaster = (rank==
commRank());
474 Int64 total_size = sbuf->totalSize();
480 Int64 total_size = 0;
482 sbuf->preallocate(total_size);
485 sbuf->setFromSizes();
492void HybridParallelMng::
493recvSerializer(ISerializer* s,
Int32 rank)
495 auto p2p_message =
buildMessage(rank,Parallel::NonBlocking);
496 Request r = m_message_queue->addReceive(p2p_message,ReceiveBufferInfo(s));
497 m_message_queue->waitAll(ArrayView<Request>(1,&r));
506 return m_utils_factory->createReceiveSerializeMessage(
this, rank)._release();
515 ARCANE_UNUSED(requests);
527 return m_message_queue->probe(p2p_message);
538 return m_message_queue->legacyProbe(p2p_message);
548 return m_message_queue->addSend(p2p_message,s);
568 m_stat->print(m_trace);
577 m_thread_barrier->wait();
579 m_mpi_parallel_mng->barrier();
580 m_thread_barrier->wait();
587_createSerializeMessageList()
590 x->setAllowAnyRankReceive(
false);
600 return m_utils_factory->createSynchronizer(
this,family)._release();
609 return m_utils_factory->createSynchronizer(
this,group)._release();
618 return m_utils_factory->createTopology(
this)._release();
627 return m_replication;
636 delete m_replication;
646 return m_sequential_parallel_mng.get();
653sequentialParallelMngRef()
655 return m_sequential_parallel_mng;
668 RequestList(HybridParallelMng* pm)
669 : m_parallel_mng(pm), m_message_queue(pm->m_message_queue),
670 m_local_rank(m_parallel_mng->localRank()) {}
675 case Parallel::WaitAll:
676 m_parallel_mng->m_message_queue->waitAll(_requests());
679 m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),
false);
682 m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),
true);
686 HybridParallelMng* m_parallel_mng;
707 m_message_queue->waitAll(requests);
716 return m_mpi_parallel_mng->getMPICommunicator();
725 return m_mpi_parallel_mng->communicator();
734 return m_mpi_parallel_mng->machineCommunicator();
760IParallelMng* HybridParallelMng::
763 ARCANE_UNUSED(kept_ranks);
764 ARCANE_THROW(NotSupportedException,
"Use createSubParallelMngRef() instead");
776 if (kept_ranks.
empty())
780 m_trace->info() <<
"CREATE SUB_PARALLEL_MNG_REF";
800 Int32 my_new_global_rank = (-1);
801 Int32 new_local_nb_rank = 0;
802 Int32 my_new_local_rank = (-1);
803 for(
Integer i=0; i<nb_kept_rank; ++i ){
804 Int32 kept_rank = kept_ranks[i];
805 if (kept_rank>=first_global_rank_in_this_mpi && kept_rank<last_global_rank_in_this_mpi)
808 my_new_global_rank = i;
809 my_new_local_rank = new_local_nb_rank - 1;
812 bool has_new_rank = (my_new_global_rank != (-1));
823 Int32 min_new_local_nb_rank = -1;
824 Int32 max_new_local_nb_rank = -1;
825 Int32 sum_new_local_nb_rank = -1;
826 Int32 min_rank = A_NULL_RANK;
827 Int32 max_rank = A_NULL_RANK;
828 computeMinMaxSum(new_local_nb_rank,min_new_local_nb_rank,max_new_local_nb_rank,
829 sum_new_local_nb_rank,min_rank,max_rank);
831 m_trace->info() <<
"CREATE SUB_PARALLEL_MNG_REF new_local_nb_rank=" << new_local_nb_rank
832 <<
" min=" << min_new_local_nb_rank
833 <<
" max=" << max_new_local_nb_rank
834 <<
" sum=" << sum_new_local_nb_rank
835 <<
" new_global_rank=" << my_new_global_rank;
839 if (max_new_local_nb_rank==1){
840 Integer nb_mpi_rank = m_mpi_parallel_mng->commSize();
850 bool do_mpi_call =
false;
851 if (min_new_local_nb_rank==1){
854 kept_mpi_ranks.
resize(nb_mpi_rank);
855 for(
Int32 x=0; x<nb_mpi_rank; ++x )
856 kept_mpi_ranks[x] = x;
865 Int16 v = (has_new_rank) ? 1 : 0;
868 for(
Int32 x=0; x<nb_mpi_rank; ++x )
869 if (gathered_ranks[x]==1)
870 kept_mpi_ranks.
add(x);
873 return m_mpi_parallel_mng->createSubParallelMngRef(kept_mpi_ranks);
878 if (max_new_local_nb_rank!=new_local_nb_rank)
879 ARCANE_FATAL(
"Not same number of new local ranks on every MPI processus: current={0} max={1}",
880 new_local_nb_rank,max_new_local_nb_rank);
882 if (max_new_local_nb_rank<2)
883 ARCANE_FATAL(
"number of local ranks is too low current={0} minimum=2",new_local_nb_rank);
886 m_thread_barrier->wait();
899 builder = m_sub_builder_factory->_createParallelMngBuilder(new_local_nb_rank, c, mc);
901 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder = builder;
904 m_thread_barrier->wait();
906 builder = m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder;
910 if (my_new_local_rank>=0){
911 new_parallel_mng = builder->_createParallelMng(my_new_local_rank,
traceMng());
913 m_thread_barrier->wait();
919 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder.reset();
921 m_thread_barrier->wait();
923 return new_parallel_mng;
932 return m_utils_factory;
938bool HybridParallelMng::
939_isAcceleratorAware()
const
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Liste des fonctions d'échange de message.
Exception lorsqu'un argument est invalide.
Vue modifiable d'un tableau d'un type T.
void resize(Int64 s)
Change le nombre d'éléments du tableau à s.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
Vue constante d'un tableau de type T.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
constexpr bool empty() const noexcept
true si le tableau est vide (size()==0)
Opérations pour accéder aux valeurs de variables d'un autre sous-domaine.
Interface du gestionnaire des entrées sorties.
Interface d'une famille d'entités.
Échange d'informations entre processeurs.
virtual bool isAcceleratorAware() const =0
Indique si l'implémentation gère les accélérateurs.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual void computeMinMaxSum(char val, char &min_val, char &max_val, char &sum_val, Int32 &min_rank, Int32 &max_rank)=0
Calcule en une opération la somme, le min, le max d'une valeur.
Informations sur la réplication des sous-domaines en parallèle.
Informations sur la topologie d'allocation des coeurs de calcul.
Interface d'un sérialiseur.
Interface du gestionnaire de traces.
virtual TraceMessage info()=0
Flot pour un message d'information.
Envoie de valeurs sur différents processeurs.
Interface d'un service de synchronisation de variable.
Groupe d'entités de maillage.
Options pour configurer les allocations.
Interface d'une file de messages avec les threads.
void machineBarrier() override
Méthode permettant de faire une barrière pour les sous-domaines du noeud de calcul.
Int32 masterParallelIORank() const override
void initializeWindowCreator() override
Méthode permettant d'initialiser le windowCreator spécifique à l'implémentation.
Int32 nbSendersToMasterParallelIO() const override
MemoryAllocationOptions machineShMemWinMemoryAllocator() override
Méthode permettant de récupérer un allocateur en mémoire partagée.
bool isMachineShMemWinAvailable() override
Méthode permettant de savoir si le mode mémoire partagée est supporté.
ConstArrayView< Int32 > machineRanks() override
Méthode permettant de récupérer les rangs des sous-domaines du noeud de calcul.
Ref< IContigMachineShMemWinBaseInternal > createContigMachineShMemWinBase(Int64 sizeof_segment, Int32 sizeof_type) override
Méthode permettant de créer une fenêtre mémoire sur le noeud.
Ref< IMachineShMemWinBaseInternal > createMachineShMemWinBase(Int64 sizeof_segment, Int32 sizeof_type) override
Méthode permettant de créer une fenêtre mémoire dynamique sur le noeud.
Implémentation de IRequestList pour HybridParallelMng.
void _wait(Parallel::eWaitType wait_type) override
Effectue l'attente ou le test.
Gestionnaire du parallélisme utilisant les threads.
IParallelMng * worldParallelMng() const override
Gestionnaire de parallélisme sur l'ensemble des ressources allouées.
void freeRequests(ArrayView< Request > requests) override
Libère les requêtes.
void printStats() override
Affiche des statistiques liées à ce gestionnaire du parallélisme.
IParallelExchanger * createExchanger() override
Retourne une interface pour transférer des messages entre processeurs.
void barrier() override
Effectue une barière.
MP::Communicator communicator() const override
Communicateur MPI associé à ce gestionnaire.
bool m_is_initialized
true si déjà initialisé
ISerializeMessage * createReceiveSerializer(Int32 rank) override
Créé un message non bloquant pour recevoir des données sérialisées du rang rank.
Ref< Parallel::IRequestList > createRequestListRef() override
Créé une liste de requêtes pour ce gestionnaire.
ITimerMng * timerMng() const override
Gestionnaire de timers.
void build() override
Construit l'instance.
void waitAllRequests(ArrayView< Request > requests) override
Bloque en attendant que les requêtes rvalues soient terminées.
void initialize() override
Initialise le gestionnaire du parallélisme.
IThreadMng * threadMng() const override
Gestionnaire de threads.
Int32 m_global_nb_rank
Nombre de rangs globaux.
MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
Int32 m_local_rank
Rang local du processeur actuel.
ITraceMng * traceMng() const override
Gestionnaire de traces.
IGetVariablesValuesParallelOperation * createGetVariablesValuesOperation() override
Retourne une opération pour récupérer les valeurs d'une variable sur les entités d'un autre sous-doma...
void * getMPICommunicator() override
Adresse du communicateur MPI associé à ce gestionnaire.
IParallelReplication * replication() const override
Informations sur la réplication.
Ref< IParallelMngUtilsFactory > _internalUtilsFactory() const override
Fabrique des fonctions utilitaires.
Ref< IParallelMng > createSubParallelMngRef(Int32ConstArrayView kept_ranks) override
Créé un nouveau gestionnaire de parallélisme pour un sous-ensemble des rangs.
MP::Communicator machineCommunicator() const override
Communicateur MPI issus du communicateur communicator() réunissant tous les processus du noeud de cal...
ITransferValuesParallelOperation * createTransferValuesOperation() override
Retourne une opération pour transférer des valeurs entre sous-domaine.
IParallelTopology * createTopology() override
Créé une instance contenant les infos sur la topologie des rangs de ce gestionnnaire.
void setReplication(IParallelReplication *v) override
Positionne les Informations sur la réplication.
IParallelMng * sequentialParallelMng() override
Retourne un gestionnaire de parallélisme séquentiel.
IVariableSynchronizer * createSynchronizer(IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
Int32 m_local_nb_rank
Nombre de rang locaux.
MessageId probe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
ISerializeMessage * createSendSerializer(Int32 rank) override
Créé un message non bloquant pour envoyer des données sérialisées au rang rank.
Int32 commRank() const override
Rang de cette instance dans le communicateur.
Int32 m_global_rank
Numéro du processeur actuel.
PointToPointMessageInfo buildMessage(Int32 dest, MP::eBlockingType is_blocking)
Construit un message avec pour destinataire dest.
void processPendingMessages() override
Envoie les messages de la liste qui ne l'ont pas encore été.
Integer waitMessages(Parallel::eWaitType wait_type) override
Attend que les messages aient terminé leur exécution.
void addMessage(ISerializeMessage *msg) override
Ajoute un message à la liste.
Interface du gestionnaire des échanges de messages.
Liste de requête de messages.
Interface d'une liste de messages de sérialisation.
Interface d'un message de sérialisation entre IMessagePassingMng.
Informations sur la source d'un message.
Informations pour envoyer/recevoir un message point à point.
void setEmiterRank(MessageRank rank)
Positionne le rang de l'émetteur du message.
Informations des buffers de réception.
Informations des buffers d'envoie.
Classe de base d'une liste de requêtes.
IParallelMngInternal * _internalApi() override
API interne à Arcane.
Exception lorsqu'une fonction n'est pas implémentée.
Redirige la gestion des messages des sous-domaines suivant le type de l'argument.
IMessagePassingMng * messagePassingMng() const override
Gestionnaire de message de Arccore associé
ITimeStats * timeStats() const override
Gestionnaire de statistiques associé (peut être nul)
Classe de base d'une fabrique pour les fonctions utilitaires de IParallelMng.
Informations sur la réplication des sous-domaines en parallèle.
Référence à une instance.
InstanceType * get() const
Instance associée ou nullptr si aucune.
Implémentation d'un tampon pour la sérialisation.
Vue d'un tableau d'éléments de type T.
Positionne la phase de l'action en cours d'exécution.
Positionne une classe de message.
Vecteur 1D de données avec sémantique par valeur (style STL).
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
C void mpBroadcast(IMessagePassingMng *pm, Span< char > send_buf, Int32 rank)
@ WaitSome
Attend que tous les messages de la liste soient traités.
eBlockingType
Type indiquant si un message est bloquant ou non.
Implémentation de la concurrence.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
std::int8_t Int8
Type entier signé sur 8 bits.
ArrayView< Int64 > Int64ArrayView
Equivalent C d'un tableau à une dimension d'entiers 64 bits.
Ref< TrueType > createRef(Args &&... args)
Créé une instance de type TrueType avec les arguments Args et retourne une référence dessus.
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
Array< Byte > ByteArray
Tableau dynamique à une dimension de caractères.
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
std::int16_t Int16
Type entier signé sur 16 bits.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int32_t Int32
Type entier signé sur 32 bits.
Infos pour construire un HybridParallelMng.
Infos pour construire un SequentialParallelMng.