14#include "arcane/parallel/thread/SharedMemoryParallelMng.h"
16#include "arcane/utils/NotImplementedException.h"
17#include "arcane/utils/NotSupportedException.h"
18#include "arcane/utils/PlatformUtils.h"
19#include "arcane/utils/TraceInfo.h"
20#include "arcane/utils/Real2.h"
21#include "arcane/utils/Real3.h"
22#include "arcane/utils/Real2x2.h"
23#include "arcane/utils/Real3x3.h"
24#include "arcane/utils/ArgumentException.h"
25#include "arcane/utils/FatalErrorException.h"
26#include "arcane/utils/ITraceMng.h"
28#include "arcane/parallel/IStat.h"
30#include "arcane/parallel/thread/SharedMemoryParallelDispatch.h"
31#include "arcane/parallel/thread/ISharedMemoryMessageQueue.h"
33#include "arcane/core/SerializeMessage.h"
34#include "arcane/core/Timer.h"
35#include "arcane/core/IIOMng.h"
36#include "arcane/core/ISerializeMessageList.h"
37#include "arcane/core/IItemFamily.h"
39#include "arcane/impl/TimerMng.h"
40#include "arcane/impl/ParallelReplication.h"
41#include "arcane/impl/ParallelMngUtilsFactoryBase.h"
43#include "arccore/message_passing/RequestListBase.h"
44#include "arccore/message_passing/SerializeMessageList.h"
53extern "C++" ARCANE_IMPL_EXPORT IIOMng*
54arcaneCreateIOMng(IParallelMng* psm);
72 : m_parallel_mng(pm), m_message_queue(pm->m_message_queue),
73 m_local_rank(m_parallel_mng->commRank()){}
78 case Parallel::WaitAll:
79 return m_message_queue->waitAll(_requests());
80 case Parallel::WaitSome:
81 return m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),
false);
82 case Parallel::WaitSomeNonBlocking:
83 return m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),
true);
95SharedMemoryParallelMng::
100, m_sequential_parallel_mng(
build_info.sequential_parallel_mng)
103, m_world_parallel_mng(
build_info.world_parallel_mng)
109, m_is_initialized(
false)
110, m_stat(Parallel::createDefaultStat())
112, m_all_dispatchers(
build_info.all_dispatchers)
113, m_sub_builder_factory(
build_info.sub_builder_factory)
118 if (!m_world_parallel_mng)
119 m_world_parallel_mng =
this;
125SharedMemoryParallelMng::
126~SharedMemoryParallelMng()
128 delete m_replication;
129 m_sequential_parallel_mng.reset();
144 DispatchCreator(ITraceMng* tm,SharedMemoryParallelMng* mpm,
145 ISharedMemoryMessageQueue* message_queue,
146 SharedMemoryAllDispatcher* all_dispatchers)
147 : m_tm(tm), m_mpm(mpm), m_message_queue(message_queue),
148 m_all_dispatchers(all_dispatchers){}
150 template<
typename DataType> SharedMemoryParallelDispatch<DataType>*
153 ISharedMemoryMessageQueue* tmq = m_message_queue;
154 SharedMemoryAllDispatcher* ad = m_all_dispatchers;
155 auto& field = ad->instance((DataType*)
nullptr);
156 return new SharedMemoryParallelDispatch<DataType>(m_tm,m_mpm,tmq,field);
160 SharedMemoryParallelMng* m_mpm;
161 ISharedMemoryMessageQueue* m_message_queue;
162 SharedMemoryAllDispatcher* m_all_dispatchers;
175 DispatchCreator creator(m_trace.get(),
this,m_message_queue,m_all_dispatchers);
176 this->createDispatchers(creator);
178 m_io_mng = arcaneCreateIOMng(
this);
189 m_trace->
warning() <<
"SharedMemoryParallelMng already initialized";
202 return m_utils_factory->createGetVariablesValuesOperation(
this)._release();
208 return m_utils_factory->createTransferValuesOperation(
this)._release();
214 return m_utils_factory->createExchanger(
this)._release();
223void SharedMemoryParallelMng::
237 ARCANE_UNUSED(bytes);
253void SharedMemoryParallelMng::
265 m_message_queue->waitAll(requests);
268 recvSerializer(values,rank);
275void SharedMemoryParallelMng::
276recvSerializer(ISerializer* values,
Int32 rank)
278 auto p2p_message =
buildMessage(rank,Parallel::Blocking);
279 Request r = m_message_queue->addReceive(p2p_message,ReceiveBufferInfo(values));
280 m_message_queue->waitAll(ArrayView<Request>(1,&r));
298 ARCANE_UNUSED(requests);
309 m_stat->
print(m_trace.get());
318 m_thread_barrier->
wait();
328 m_message_queue->waitAll(requests);
329 Real
end_time = platform::getRealTime();
337_createSerializeMessageList()
367auto SharedMemoryParallelMng::
377auto SharedMemoryParallelMng::
381 return m_message_queue->addReceive(
p2p_message,ReceiveBufferInfo(values));
390 return m_utils_factory->createSynchronizer(
this,family)._release();
399 return m_utils_factory->createSynchronizer(
this,group)._release();
408 return m_utils_factory->createTopology(
this)._release();
417 return m_replication;
426 delete m_replication;
459 for( Integer i=0; i<nb_rank; ++i )
470 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder = builder;
474 builder = m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder;
490 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder.reset();
511sequentialParallelMngRef()
513 return m_sequential_parallel_mng;
519 return m_sequential_parallel_mng.get();
548 return m_utils_factory;
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Tableau d'items de types quelconques.
Opérations pour accéder aux valeurs de variables d'un autre sous-domaine.
Interface d'une famille d'entités.
Echange d'informations entre processeurs.
virtual Ref< IParallelMngContainer > _createParallelMngBuilder(Int32 nb_local_rank, Parallel::Communicator communicator)=0
Créé un conteneur pour nb_local_rank rangs locaux et avec comme communicateur communicator.
Interface du gestionnaire de parallélisme pour un sous-domaine.
Informations sur la réplication des sous-domaines en parallèle.
Informations sur la topologie d'allocation des coeurs de calcul.
Envoie de valeurs sur différents processeurs.
Interface d'un service de synchronisation de variable.
Groupe d'entités de maillage.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Interface d'une file de messages avec les threads.
Informations des buffers d'envoie.
Implémentation de IRequestList pour SharedMemoryParallelMng.
void _wait(Parallel::eWaitType wait_type) override
Effectue l'attente ou le test.
Gestionnaire du parallélisme utilisant les threads.
IParallelTopology * createTopology() override
Créé une instance contenant les infos sur la topologie des rangs de ce gestionnnaire.
IGetVariablesValuesParallelOperation * createGetVariablesValuesOperation() override
Retourne une opération pour récupérer les valeurs d'une variable sur les entités d'un autre sous-doma...
Ref< Parallel::IRequestList > createRequestListRef() override
Créé une liste de requêtes pour ce gestionnaire.
void build() override
Construit l'instance.
Int32 m_nb_rank
Nombre de rangs.
bool m_is_initialized
true si déjà initialisé
void printStats() override
Affiche des statistiques liées à ce gestionnaire du parallélisme.
void initialize() override
Initialise le gestionnaire du parallélisme.
void barrier() override
Effectue une barière.
void setReplication(IParallelReplication *v) override
Positionne les Informations sur la réplication.
PointToPointMessageInfo buildMessage(Int32 dest, MP::eBlockingType is_blocking)
Construit un message avec pour destinataire dest.
void waitAllRequests(ArrayView< Request > requests) override
Bloque en attendant que les requêtes rvalues soient terminées.
Ref< IParallelMngUtilsFactory > _internalUtilsFactory() const override
Fabrique des fonctions utilitaires.
IVariableSynchronizer * createSynchronizer(IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
MessageId probe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
ISerializeMessage * createSendSerializer(Int32 rank) override
Créé un message non bloquant pour envoyer des données sérialisées au rang rank.
ITraceMng * traceMng() const override
Gestionnaire de traces.
IParallelMng * sequentialParallelMng() override
Retourne un gestionnaire de parallélisme séquentiel.
Int32 m_rank
Rang de l'instance.
IParallelExchanger * createExchanger() override
Retourne une interface pour transférer des messages entre processeurs.
void freeRequests(ArrayView< Parallel::Request > requests) override
Libère les requêtes.
ISerializeMessage * createReceiveSerializer(Int32 rank) override
Créé un message non bloquant pour recevoir des données sérialisées du rang rank.
Ref< IParallelMng > createSubParallelMngRef(Int32ConstArrayView kept_ranks) override
Créé un nouveau gestionnaire de parallélisme pour un sous-ensemble des rangs.
ITransferValuesParallelOperation * createTransferValuesOperation() override
Retourne une opération pour transférer des valeurs entre sous-domaine.
MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
IParallelReplication * replication() const override
Informations sur la réplication.
Redirige la gestion des messages des sous-domaines suivant le type de l'argument.
IMessagePassingMng * messagePassingMng() const override
Gestionnaire de message de Arccore associé
Classe de base d'une fabrique pour les fonctions utilitaires de IParallelMng.
Informations sur la réplication des sous-domaines en parallèle.
virtual void add(const String &name, double elapsed_time, Int64 msg_size)=0
Ajoute une statistique.
virtual void print(ITraceMng *trace)=0
Imprime sur trace les statistiques.
Message utilisant un SerializeBuffer.
Vue constante d'un tableau de type T.
Interface d'un sérialiseur.
virtual bool wait()=0
Bloque et attend que tous les threads appellent cette méthode.
Liste de requête de messages.
Interface d'une liste de messages de sérialisation.
Interface d'un message de sérialisation entre IMessagePassingMng.
Informations sur la source d'un message.
Informations pour envoyer/recevoir un message point à point.
Classe de base d'une liste de requêtes.
Exception lorsqu'une fonction n'est pas implémentée.
Exception lorsqu'une opération n'est pas supportée.
TraceMessage warning() const
Flot pour un message d'avertissement.
Positionne une classe de message.
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
eBlockingType
Type indiquant si un message est bloquant ou non.
Infos pour construire un SharedMemoryParallelMng.