14#include "arcane/parallel/thread/SharedMemoryParallelMng.h"
16#include "arcane/utils/NotImplementedException.h"
17#include "arcane/utils/NotSupportedException.h"
18#include "arcane/utils/PlatformUtils.h"
19#include "arcane/utils/TraceInfo.h"
20#include "arcane/utils/NumericTypes.h"
21#include "arcane/utils/ArgumentException.h"
22#include "arcane/utils/FatalErrorException.h"
23#include "arcane/utils/ITraceMng.h"
25#include "arcane/parallel/IStat.h"
27#include "arcane/parallel/thread/SharedMemoryParallelDispatch.h"
28#include "arcane/parallel/thread/ISharedMemoryMessageQueue.h"
29#include "arcane/parallel/thread/internal/SharedMemoryMachineShMemWinBaseInternalCreator.h"
30#include "arcane/parallel/thread/internal/SharedMemoryContigMachineShMemWinBaseInternal.h"
31#include "arcane/parallel/thread/internal/SharedMemoryMachineShMemWinBaseInternal.h"
33#include "arcane/core/Timer.h"
34#include "arcane/core/IIOMng.h"
35#include "arcane/core/ISerializeMessageList.h"
36#include "arcane/core/IItemFamily.h"
37#include "arcane/core/internal/SerializeMessage.h"
38#include "arcane/core/internal/ParallelMngInternal.h"
39#include "arcane/core/internal/MachineShMemWinMemoryAllocator.h"
41#include "arcane/impl/TimerMng.h"
42#include "arcane/impl/ParallelReplication.h"
43#include "arcane/impl/internal/ParallelMngUtilsFactoryBase.h"
45#include "arccore/message_passing/RequestListBase.h"
46#include "arccore/message_passing/internal/SerializeMessageList.h"
55extern "C++" ARCANE_IMPL_EXPORT
IIOMng*
73 explicit RequestList(SharedMemoryParallelMng* pm)
74 : m_parallel_mng(pm), m_message_queue(pm->m_message_queue),
75 m_local_rank(m_parallel_mng->commRank()){}
80 case Parallel::WaitAll:
81 return m_message_queue->waitAll(_requests());
83 return m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),
false);
85 return m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),
true);
89 SharedMemoryParallelMng* m_parallel_mng;
98:
public ParallelMngInternal
103 : ParallelMngInternal(pm)
105 , m_window_creator(window_creator)
109 ~Impl()
override =
default;
125 return makeRef(m_window_creator->createWindow(m_parallel_mng->commRank(), sizeof_segment, sizeof_type));
130 return makeRef(m_window_creator->createDynamicWindow(m_parallel_mng->commRank(), sizeof_segment, sizeof_type));
140 return m_window_creator->machineRanks();
145 m_window_creator->machineBarrier();
150 SharedMemoryParallelMng* m_parallel_mng;
158SharedMemoryParallelMng::
161, m_trace(build_info.trace_mng)
162, m_thread_mng(build_info.thread_mng)
163, m_sequential_parallel_mng(build_info.sequential_parallel_mng)
164, m_timer_mng(nullptr)
166, m_world_parallel_mng(build_info.world_parallel_mng)
168, m_message_queue(build_info.message_queue)
169, m_is_parallel(build_info.nb_rank!=1)
170, m_rank(build_info.rank)
171, m_nb_rank(build_info.nb_rank)
172, m_is_initialized(false)
173, m_stat(
Parallel::createDefaultStat())
174, m_thread_barrier(build_info.thread_barrier)
175, m_all_dispatchers(build_info.all_dispatchers)
176, m_sub_builder_factory(build_info.sub_builder_factory)
177, m_parent_container_ref(build_info.container)
178, m_mpi_communicator(build_info.communicator)
180, m_parallel_mng_internal(new Impl(this, build_info.window_creator))
182 if (!m_world_parallel_mng)
183 m_world_parallel_mng =
this;
189SharedMemoryParallelMng::
190~SharedMemoryParallelMng()
192 delete m_parallel_mng_internal;
193 delete m_replication;
194 m_sequential_parallel_mng.reset();
209 DispatchCreator(ITraceMng* tm,SharedMemoryParallelMng* mpm,
210 ISharedMemoryMessageQueue* message_queue,
211 SharedMemoryAllDispatcher* all_dispatchers)
212 : m_tm(tm), m_mpm(mpm), m_message_queue(message_queue),
213 m_all_dispatchers(all_dispatchers){}
215 template<
typename DataType> SharedMemoryParallelDispatch<DataType>*
218 ISharedMemoryMessageQueue* tmq = m_message_queue;
219 SharedMemoryAllDispatcher* ad = m_all_dispatchers;
220 auto& field = ad->instance((DataType*)
nullptr);
221 return new SharedMemoryParallelDispatch<DataType>(m_tm,m_mpm,tmq,field);
225 SharedMemoryParallelMng* m_mpm;
226 ISharedMemoryMessageQueue* m_message_queue;
227 SharedMemoryAllDispatcher* m_all_dispatchers;
240 DispatchCreator creator(m_trace.get(),
this,m_message_queue,m_all_dispatchers);
241 this->createDispatchers(creator);
243 m_io_mng = arcaneCreateIOMng(
this);
254 m_trace->warning() <<
"SharedMemoryParallelMng already initialized";
267 return m_utils_factory->createGetVariablesValuesOperation(
this)._release();
273 return m_utils_factory->createTransferValuesOperation(
this)._release();
279 return m_utils_factory->createExchanger(
this)._release();
288void SharedMemoryParallelMng::
291 auto p2p_message =
buildMessage(dest_rank,Parallel::Blocking);
302 ARCANE_UNUSED(bytes);
312 return m_utils_factory->createSendSerializeMessage(
this, rank)._release();
318void SharedMemoryParallelMng::
330 m_message_queue->waitAll(requests);
333 recvSerializer(values,rank);
340void SharedMemoryParallelMng::
341recvSerializer(ISerializer* values,
Int32 rank)
343 auto p2p_message =
buildMessage(rank,Parallel::Blocking);
344 Request r = m_message_queue->addReceive(p2p_message,ReceiveBufferInfo(values));
345 m_message_queue->waitAll(ArrayView<Request>(1,&r));
354 return m_utils_factory->createReceiveSerializeMessage(
this, rank)._release();
363 ARCANE_UNUSED(requests);
374 m_stat->print(m_trace.get());
383 m_thread_barrier->wait();
393 m_message_queue->waitAll(requests);
395 m_stat->add(
"WaitAll",(end_time-begin_time),0);
402_createSerializeMessageList()
415 return m_message_queue->probe(p2p_message);
426 return m_message_queue->legacyProbe(p2p_message);
432auto SharedMemoryParallelMng::
436 return m_message_queue->addSend(p2p_message,
SendBufferInfo(values));
442auto SharedMemoryParallelMng::
455 return m_utils_factory->createSynchronizer(
this,family)._release();
464 return m_utils_factory->createSynchronizer(
this,group)._release();
473 return m_utils_factory->createTopology(
this)._release();
482 return m_replication;
491 delete m_replication;
501 ARCANE_UNUSED(kept_ranks);
514 if (kept_ranks.
empty())
523 Int32 my_new_rank = (-1);
524 for(
Integer i=0; i<nb_rank; ++i )
525 if (kept_ranks[i]==
m_rank){
533 builder = m_sub_builder_factory->_createParallelMngBuilder(nb_rank, m_mpi_communicator, m_mpi_communicator);
535 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder = builder;
539 builder = m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder;
544 new_parallel_mng = builder->_createParallelMng(my_new_rank,
traceMng());
555 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder.reset();
559 return new_parallel_mng;
576sequentialParallelMngRef()
578 return m_sequential_parallel_mng;
584 return m_sequential_parallel_mng.get();
613 return m_utils_factory;
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Vue modifiable d'un tableau d'un type T.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
Vue constante d'un tableau de type T.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
constexpr bool empty() const noexcept
true si le tableau est vide (size()==0)
Opérations pour accéder aux valeurs de variables d'un autre sous-domaine.
Interface du gestionnaire des entrées sorties.
Interface d'une famille d'entités.
Échange d'informations entre processeurs.
Interface du gestionnaire de parallélisme pour un sous-domaine.
Informations sur la réplication des sous-domaines en parallèle.
Informations sur la topologie d'allocation des coeurs de calcul.
Interface d'un sérialiseur.
Envoie de valeurs sur différents processeurs.
Interface d'un service de synchronisation de variable.
Groupe d'entités de maillage.
Options pour configurer les allocations.
Liste de requête de messages.
Interface d'une liste de messages de sérialisation.
Interface d'un message de sérialisation entre IMessagePassingMng.
Interface d'une file de messages avec les threads.
Informations sur la source d'un message.
Informations pour envoyer/recevoir un message point à point.
void setEmiterRank(MessageRank rank)
Positionne le rang de l'émetteur du message.
Informations des buffers de réception.
Informations des buffers d'envoie.
Int32 masterParallelIORank() const override
bool isMachineShMemWinAvailable() override
Méthode permettant de savoir si le mode mémoire partagée est supporté.
MemoryAllocationOptions machineShMemWinMemoryAllocator() override
Méthode permettant de récupérer un allocateur en mémoire partagée.
void initializeWindowCreator() override
Méthode permettant d'initialiser le windowCreator spécifique à l'implémentation.
Ref< IMachineShMemWinBaseInternal > createMachineShMemWinBase(Int64 sizeof_segment, Int32 sizeof_type) override
Méthode permettant de créer une fenêtre mémoire dynamique sur le noeud.
ConstArrayView< Int32 > machineRanks() override
Méthode permettant de récupérer les rangs des sous-domaines du noeud de calcul.
Int32 nbSendersToMasterParallelIO() const override
Ref< IContigMachineShMemWinBaseInternal > createContigMachineShMemWinBase(Int64 sizeof_segment, Int32 sizeof_type) override
Méthode permettant de créer une fenêtre mémoire sur le noeud.
void machineBarrier() override
Méthode permettant de faire une barrière pour les sous-domaines du noeud de calcul.
Implémentation de IRequestList pour SharedMemoryParallelMng.
void _wait(Parallel::eWaitType wait_type) override
Effectue l'attente ou le test.
IParallelTopology * createTopology() override
Créé une instance contenant les infos sur la topologie des rangs de ce gestionnnaire.
IGetVariablesValuesParallelOperation * createGetVariablesValuesOperation() override
Retourne une opération pour récupérer les valeurs d'une variable sur les entités d'un autre sous-doma...
Ref< Parallel::IRequestList > createRequestListRef() override
Créé une liste de requêtes pour ce gestionnaire.
void build() override
Construit l'instance.
Int32 m_nb_rank
Nombre de rangs.
bool m_is_initialized
true si déjà initialisé
void printStats() override
Affiche des statistiques liées à ce gestionnaire du parallélisme.
void initialize() override
Initialise le gestionnaire du parallélisme.
void barrier() override
Effectue une barière.
void setReplication(IParallelReplication *v) override
Positionne les Informations sur la réplication.
PointToPointMessageInfo buildMessage(Int32 dest, MP::eBlockingType is_blocking)
Construit un message avec pour destinataire dest.
void waitAllRequests(ArrayView< Request > requests) override
Bloque en attendant que les requêtes rvalues soient terminées.
Ref< IParallelMngUtilsFactory > _internalUtilsFactory() const override
Fabrique des fonctions utilitaires.
IVariableSynchronizer * createSynchronizer(IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
MessageId probe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
ISerializeMessage * createSendSerializer(Int32 rank) override
Créé un message non bloquant pour envoyer des données sérialisées au rang rank.
ITraceMng * traceMng() const override
Gestionnaire de traces.
IParallelMng * sequentialParallelMng() override
Retourne un gestionnaire de parallélisme séquentiel.
Int32 m_rank
Rang de l'instance.
IParallelExchanger * createExchanger() override
Retourne une interface pour transférer des messages entre processeurs.
void freeRequests(ArrayView< Parallel::Request > requests) override
Libère les requêtes.
ISerializeMessage * createReceiveSerializer(Int32 rank) override
Créé un message non bloquant pour recevoir des données sérialisées du rang rank.
Ref< IParallelMng > createSubParallelMngRef(Int32ConstArrayView kept_ranks) override
Créé un nouveau gestionnaire de parallélisme pour un sous-ensemble des rangs.
ITransferValuesParallelOperation * createTransferValuesOperation() override
Retourne une opération pour transférer des valeurs entre sous-domaine.
MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
IParallelReplication * replication() const override
Informations sur la réplication.
Classe de base d'une liste de requêtes.
Exception lorsqu'une fonction n'est pas implémentée.
Exception lorsqu'une opération n'est pas supportée.
Redirige la gestion des messages des sous-domaines suivant le type de l'argument.
IMessagePassingMng * messagePassingMng() const override
Gestionnaire de message de Arccore associé
Classe de base d'une fabrique pour les fonctions utilitaires de IParallelMng.
Informations sur la réplication des sous-domaines en parallèle.
Référence à une instance.
InstanceType * get() const
Instance associée ou nullptr si aucune.
Positionne une classe de message.
Vecteur 1D de données avec sémantique par valeur (style STL).
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
@ WaitSome
Attend que tous les messages de la liste soient traités.
eBlockingType
Type indiquant si un message est bloquant ou non.
Implémentation de la concurrence.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
Ref< TrueType > createRef(Args &&... args)
Créé une instance de type TrueType avec les arguments Args et retourne une référence dessus.
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
Array< Byte > ByteArray
Tableau dynamique à une dimension de caractères.
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
double Real
Type représentant un réel.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int32_t Int32
Type entier signé sur 32 bits.
Infos pour construire un SharedMemoryParallelMng.