14#include "arcane/utils/Array.h"
15#include "arcane/utils/HPReal.h"
16#include "arcane/utils/NumericTypes.h"
17#include "arcane/utils/NotImplementedException.h"
18#include "arcane/utils/ScopedPtr.h"
19#include "arcane/utils/ITraceMng.h"
20#include "arcane/utils/ValueConvert.h"
21#include "arcane/utils/FatalErrorException.h"
23#include "arcane/core/ParallelMngDispatcher.h"
24#include "arcane/core/IParallelDispatch.h"
25#include "arcane/core/Timer.h"
26#include "arcane/core/ITimeStats.h"
27#include "arcane/core/IParallelNonBlockingCollective.h"
28#include "arcane/core/internal/IParallelMngInternal.h"
30#include "arcane/accelerator/core/Runner.h"
31#include "arcane/accelerator/core/RunQueueBuildInfo.h"
33#include "arccore/message_passing/Dispatchers.h"
35#include "arccore/message_passing/MessagePassingMng.h"
36#include "arccore/message_passing/IControlDispatcher.h"
37#include "arccore/message_passing/ISerializeDispatcher.h"
38#include "arccore/message_passing/PointToPointMessageInfo.h"
39#include "arccore/message_passing/IRequestList.h"
40#include "arccore/message_passing/ISerializeMessageList.h"
41#include "arccore/trace/TimeMetric.h"
53ParallelMngDispatcherBuildInfo::
56: m_comm_rank(mpm->commRank())
57, m_comm_size(mpm->commSize())
58, m_dispatchers(dispatchers)
59, m_message_passing_mng(mpm)
67ParallelMngDispatcherBuildInfo::
70: m_comm_rank(mpm_ref->commRank())
71, m_comm_size(mpm_ref->commSize())
72, m_dispatchers(dispatchers.get())
73, m_dispatchers_ref(dispatchers)
74, m_message_passing_mng(mpm_ref.get())
75, m_message_passing_mng_ref(mpm_ref)
83ParallelMngDispatcherBuildInfo::
84ParallelMngDispatcherBuildInfo(
Int32 comm_rank,
Int32 comm_size)
85: m_comm_rank(comm_rank)
86, m_comm_size(comm_size)
87, m_dispatchers(nullptr)
88, m_message_passing_mng(nullptr)
96void ParallelMngDispatcherBuildInfo::
100 m_dispatchers_ref = createRef<MP::Dispatchers>();
101 m_dispatchers = m_dispatchers_ref.get();
103 if (!m_message_passing_mng){
105 m_message_passing_mng = x;
106 m_message_passing_mng_ref =
makeRef(x);
108 if (!m_message_passing_mng_ref.get())
109 m_message_passing_mng_ref =
makeRef(m_message_passing_mng);
115ParallelMngDispatcher::DefaultControlDispatcher::
116DefaultControlDispatcher(IParallelMng* pm)
122void ParallelMngDispatcher::DefaultControlDispatcher::
125 m_parallel_mng->waitAllRequests(requests);
128void ParallelMngDispatcher::DefaultControlDispatcher::
130 bool is_non_blocking)
134 done_requests = m_parallel_mng->testSomeRequests(requests);
136 done_requests = m_parallel_mng->waitSomeRequests(requests);
138 for(
int x : done_requests )
149void ParallelMngDispatcher::DefaultControlDispatcher::
152 m_parallel_mng->barrier();
155Request ParallelMngDispatcher::DefaultControlDispatcher::
158 return m_parallel_mng->nonBlockingCollective()->barrier();
161MessageId ParallelMngDispatcher::DefaultControlDispatcher::
164 return m_parallel_mng->probe(message);
170 return m_parallel_mng->legacyProbe(message);
174createRequestListRef()
176 return m_parallel_mng->createRequestListRef();
182void ParallelMngDispatcher::DefaultControlDispatcher::
204 return m_parallel_mng->createSerializeMessageListRef();
208 return m_parallel_mng->sendSerializer(s,message);
212 return m_parallel_mng->receiveSerializer(s,message);
228 , m_runner(Accelerator::eExecutionPolicy::Sequential)
232 m_is_accelerator_aware_disabled = (v.value()!=0);
243 if (m_is_accelerator_aware_disabled)
245 if (m_queue.isNull())
247 if (!m_queue.isAcceleratorPolicy())
249 return m_parallel_mng->_isAcceleratorAware();
251 void setDefaultRunner(
const Runner& runner)
override
253 if (!m_runner.isInitialized())
259 m_queue.setAsync(
true);
263 return m_parallel_mng->_createSubParallelMngRef(color, key);
271 bool m_is_accelerator_aware_disabled =
false;
280ParallelMngDispatcher::
303, m_mp_dispatchers_ref(
bi.dispatchersRef())
304, m_message_passing_mng_ref(
bi.messagePassingMngRef())
305, m_control_dispatcher(
new DefaultControlDispatcher(this))
306, m_serialize_dispatcher(
new SerializeDispatcher(this))
307, m_parallel_mng_internal(
new Impl(this))
314ParallelMngDispatcher::
315~ParallelMngDispatcher()
317 m_mp_dispatchers_ref.reset();
319 delete m_parallel_mng_internal;
321 delete m_serialize_dispatcher;
322 delete m_control_dispatcher;
324 delete m_signed_char;
325 delete m_unsigned_char;
327 delete m_unsigned_short;
329 delete m_unsigned_int;
331 delete m_unsigned_long;
333 delete m_unsigned_long_long;
337 delete m_long_double;
348void ParallelMngDispatcher::
351 delete m_control_dispatcher;
352 m_control_dispatcher = d;
358void ParallelMngDispatcher::
361 delete m_serialize_dispatcher;
362 m_serialize_dispatcher = d;
368void ParallelMngDispatcher::
369_setArccoreDispatchers()
371 m_mp_dispatchers_ref->setDispatcher(m_char->toArccoreDispatcher());
372 m_mp_dispatchers_ref->setDispatcher(m_signed_char->toArccoreDispatcher());
373 m_mp_dispatchers_ref->setDispatcher(m_unsigned_char->toArccoreDispatcher());
374 m_mp_dispatchers_ref->setDispatcher(m_short->toArccoreDispatcher());
375 m_mp_dispatchers_ref->setDispatcher(m_unsigned_short->toArccoreDispatcher());
376 m_mp_dispatchers_ref->setDispatcher(m_int->toArccoreDispatcher());
377 m_mp_dispatchers_ref->setDispatcher(m_unsigned_int->toArccoreDispatcher());
378 m_mp_dispatchers_ref->setDispatcher(m_long->toArccoreDispatcher());
379 m_mp_dispatchers_ref->setDispatcher(m_unsigned_long->toArccoreDispatcher());
380 m_mp_dispatchers_ref->setDispatcher(m_long_long->toArccoreDispatcher());
381 m_mp_dispatchers_ref->setDispatcher(m_unsigned_long_long->toArccoreDispatcher());
382 m_mp_dispatchers_ref->setDispatcher(m_float->toArccoreDispatcher());
383 m_mp_dispatchers_ref->setDispatcher(m_double->toArccoreDispatcher());
384 m_mp_dispatchers_ref->setDispatcher(m_long_double->toArccoreDispatcher());
387 m_mp_dispatchers_ref->setDispatcher(m_control_dispatcher);
389 m_mp_dispatchers_ref->setDispatcher(m_serialize_dispatcher);
398 return m_message_passing_mng_ref.get();
423 _messagePassingMng()->setTimeMetricCollector(
c);
431_communicationTimeMetricAction()
const
433 return Timer::phaseAction(
timeStats(),TP_Communication);
439void ParallelMngDispatcher::
446 len_info[0] = bytes.
size();
449 broadcast(utf8_array,rank);
454 broadcast(utf8_array,rank);
455 str = String::fromUtf8(utf8_array);
467 size = bytes.largeSize();
476 broadcast(bytes,rank);
514 return makeRef(_createSerializeMessageList());
523 return _createSerializeMessageList();
541 return makeRef(_createSubParallelMng(
kept_ranks));
559 Timer::Phase tphase(
timeStats(),TP_Communication);
562 request_list->add(requests);
563 request_list->wait(wait_type);
565 for (
Integer i=0; i<nb_request; ++i )
566 requests[i] = request_list->request(i);
567 return request_list->doneRequestIndexes();
576 return _doWaitRequests(requests,Parallel::WaitSome);
585 return _doWaitRequests(requests,Parallel::WaitSomeNonBlocking);
591#define ARCANE_PARALLEL_MANAGER_DISPATCH(field,type)\
592void ParallelMngDispatcher::\
593allGather(ConstArrayView<type> send_buf,ArrayView<type> recv_buf)\
595 Timer::Phase tphase(timeStats(),TP_Communication);\
596 field->allGather(send_buf,recv_buf);\
598void ParallelMngDispatcher::\
599gather(ConstArrayView<type> send_buf,ArrayView<type> recv_buf,Integer rank) \
601 Timer::Phase tphase(timeStats(),TP_Communication);\
602 field->gather(send_buf,recv_buf,rank); \
604void ParallelMngDispatcher::\
605allGatherVariable(ConstArrayView<type> send_buf,Array<type>& recv_buf)\
607 Timer::Phase tphase(timeStats(),TP_Communication);\
608 field->allGatherVariable(send_buf,recv_buf);\
610void ParallelMngDispatcher::\
611gatherVariable(ConstArrayView<type> send_buf,Array<type>& recv_buf,Integer rank) \
613 Timer::Phase tphase(timeStats(),TP_Communication);\
614 field->gatherVariable(send_buf,recv_buf,rank); \
616void ParallelMngDispatcher::\
617scatterVariable(ConstArrayView<type> send_buf,ArrayView<type> recv_buf,Integer root)\
619 Timer::Phase tphase(timeStats(),TP_Communication);\
620 field->scatterVariable(send_buf,recv_buf,root);\
622type ParallelMngDispatcher::\
623reduce(eReduceType rt,type v)\
625 Timer::Phase tphase(timeStats(),TP_Communication);\
626 return field->allReduce(rt,v);\
628void ParallelMngDispatcher::\
629reduce(eReduceType rt,ArrayView<type> v)\
631 Timer::Phase tphase(timeStats(),TP_Communication);\
632 field->allReduce(rt,v);\
634void ParallelMngDispatcher::\
635broadcast(ArrayView<type> send_buf,Integer id)\
637 Timer::Phase tphase(timeStats(),TP_Communication);\
638 field->broadcast(send_buf,id);\
640void ParallelMngDispatcher::\
641send(ConstArrayView<type> values,Integer id)\
643 Timer::Phase tphase(timeStats(),TP_Communication);\
644 field->send(values,id);\
646void ParallelMngDispatcher::\
647recv(ArrayView<type> values,Integer id)\
649 Timer::Phase tphase(timeStats(),TP_Communication);\
650 field->recv(values,id);\
652Parallel::Request ParallelMngDispatcher::\
653send(ConstArrayView<type> values,Integer id,bool is_blocked)\
655 Timer::Phase tphase(timeStats(),TP_Communication);\
656 return field->send(values,id,is_blocked);\
658Request ParallelMngDispatcher::\
659send(Span<const type> values,const PointToPointMessageInfo& message) \
661 Timer::Phase tphase(timeStats(),TP_Communication);\
662 return field->send(values,message);\
664Parallel::Request ParallelMngDispatcher::\
665recv(ArrayView<type> values,Integer id,bool is_blocked)\
667 Timer::Phase tphase(timeStats(),TP_Communication);\
668 return field->recv(values,id,is_blocked);\
670Request ParallelMngDispatcher::\
671receive(Span<type> values,const PointToPointMessageInfo& message) \
673 Timer::Phase tphase(timeStats(),TP_Communication);\
674 return field->receive(values,message);\
676void ParallelMngDispatcher::\
677sendRecv(ConstArrayView<type> send_buf,ArrayView<type> recv_buf,Integer id)\
679 Timer::Phase tphase(timeStats(),TP_Communication);\
680 field->sendRecv(send_buf,recv_buf,id);\
682void ParallelMngDispatcher::\
683allToAll(ConstArrayView<type> send_buf,ArrayView<type> recv_buf,Integer count)\
685 Timer::Phase tphase(timeStats(),TP_Communication);\
686 field->allToAll(send_buf,recv_buf,count);\
688void ParallelMngDispatcher::\
689allToAllVariable(ConstArrayView<type> send_buf,Int32ConstArrayView send_count,\
690 Int32ConstArrayView send_index,ArrayView<type> recv_buf,\
691 Int32ConstArrayView recv_count,Int32ConstArrayView recv_index)\
693 Timer::Phase tphase(timeStats(),TP_Communication);\
694 field->allToAllVariable(send_buf,send_count,send_index,recv_buf,recv_count,recv_index);\
696type ParallelMngDispatcher::\
697scan(eReduceType rt,type v)\
699 Timer::Phase tphase(timeStats(),TP_Communication);\
700 return field->scan(rt,v);\
702void ParallelMngDispatcher::\
703computeMinMaxSum(type val,type& min_val,type& max_val,type& sum_val,Int32& min_proc,Int32& max_proc)\
705 Timer::Phase tphase(timeStats(),TP_Communication);\
706 field->computeMinMaxSum(val,min_val,max_val,sum_val,min_proc,max_proc);\
708IParallelDispatchT<type>* ParallelMngDispatcher::\
713void ParallelMngDispatcher::\
714computeMinMaxSum(ConstArrayView<type> values, \
715 ArrayView<type> min_values, \
716 ArrayView<type> max_values, \
717 ArrayView<type> sum_values, \
718 ArrayView<Int32> min_ranks, \
719 ArrayView<Int32> max_ranks) \
721 Timer::Phase tphase(timeStats(),TP_Communication);\
722 field->computeMinMaxSum(values,min_values,max_values,sum_values,min_ranks,max_ranks);\
724void ParallelMngDispatcher::\
725scan(eReduceType rt,ArrayView<type> v)\
727 Timer::Phase tphase(timeStats(),TP_Communication);\
731ARCANE_PARALLEL_MANAGER_DISPATCH(m_char,
char)
732ARCANE_PARALLEL_MANAGER_DISPATCH(m_unsigned_char,
unsigned char)
733ARCANE_PARALLEL_MANAGER_DISPATCH(m_signed_char,
signed char)
734ARCANE_PARALLEL_MANAGER_DISPATCH(m_short,
short)
735ARCANE_PARALLEL_MANAGER_DISPATCH(m_unsigned_short,
unsigned short)
736ARCANE_PARALLEL_MANAGER_DISPATCH(m_int,
int)
737ARCANE_PARALLEL_MANAGER_DISPATCH(m_unsigned_int,
unsigned int)
738ARCANE_PARALLEL_MANAGER_DISPATCH(m_long,
long)
739ARCANE_PARALLEL_MANAGER_DISPATCH(m_unsigned_long,
unsigned long)
740ARCANE_PARALLEL_MANAGER_DISPATCH(m_long_long,
long long)
741ARCANE_PARALLEL_MANAGER_DISPATCH(m_unsigned_long_long,
unsigned long long)
742ARCANE_PARALLEL_MANAGER_DISPATCH(m_float,
float)
743ARCANE_PARALLEL_MANAGER_DISPATCH(m_double,
double)
744ARCANE_PARALLEL_MANAGER_DISPATCH(m_long_double,
long double)
745ARCANE_PARALLEL_MANAGER_DISPATCH(m_apreal,
APReal)
746ARCANE_PARALLEL_MANAGER_DISPATCH(m_real2,
Real2)
747ARCANE_PARALLEL_MANAGER_DISPATCH(m_real3,
Real3)
748ARCANE_PARALLEL_MANAGER_DISPATCH(m_real2x2,
Real2x2)
749ARCANE_PARALLEL_MANAGER_DISPATCH(m_real3x3,
Real3x3)
750ARCANE_PARALLEL_MANAGER_DISPATCH(m_hpreal,
HPReal)
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Liste des fonctions d'échange de message.
#define ARCCORE_DEFINE_REFERENCE_COUNTED_CLASS(class_name)
Macro pour définir les méthodes et types une classe qui utilise un compteur de référence.
Informations pour créer une RunQueue.
File d'exécution pour un accélérateur.
Gestionnaire d'exécution pour accélérateur.
Tableau d'items de types quelconques.
Classe implémentant un réel Haute Précision.
Interface d'un conteneur de 'IParallelMng'.
Partie interne de IParallelMng.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual Int32 commRank() const =0
Rang de cette instance dans le communicateur.
virtual Ref< Parallel::IRequestList > createRequestListRef()=0
Créé une liste de requêtes pour ce gestionnaire.
Interface gérant les statistiques sur les temps d'exécution.
virtual ITimeMetricCollector * metricCollector()=0
Interface de collection associée.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
bool isAcceleratorAware() const override
Indique si l'implémentation gère les accélérateurs.
RunQueue queue() const override
File par défaut pour les messages. Peut être nul.
Runner runner() const override
Runner par défaut. Peut être nul.
Ref< IParallelMng > createSubParallelMngRef(Int32 color, Int32 key) override
Créé un sous IParallelMng de manière similaire à MPI_Comm_split.
Request sendSerializer(const ISerializer *s, const PointToPointMessageInfo &message) override
Message d'envoi.
Request receiveSerializer(ISerializer *s, const PointToPointMessageInfo &message) override
Message de réception.
Ref< ISerializeMessageList > createSerializeMessageListRef() override
Créé une liste de messages de sérialisation.
Redirige la gestion des messages des sous-domaines suivant le type de l'argument.
ISerializeMessageList * createSerializeMessageList() final
Créé une liste pour gérer les 'ISerializeMessage'.
void setTimeStats(ITimeStats *ts) override
Positionne le gestionnaire de statistiques.
void allGather(ISerializer *send_serializer, ISerializer *recv_serializer) override
Redéfinit ici allGather pour éviter de cacher le symbole dans les classes dérivées.
Ref< IParallelMng > createSubParallelMngRef(Int32ConstArrayView kept_ranks) override
Créé un nouveau gestionnaire de parallélisme pour un sous-ensemble des rangs.
void processMessages(ConstArrayView< ISerializeMessage * > messages) override
Exécute les opérations des messages messages.
IMessagePassingMng * messagePassingMng() const override
Gestionnaire de message de Arccore associé
ITimeMetricCollector * timeMetricCollector() const override
Collecteur Arccore des statistiques temporelles (peut être nul)
IParallelMng * createSubParallelMng(Int32ConstArrayView kept_ranks) final
Créé un nouveau gestionnaire de parallélisme pour un sous-ensemble des rangs.
UniqueArray< Integer > testSomeRequests(ArrayView< Request > requests) override
Test si une des requêtes rvalues est terminée.
ITimeStats * timeStats() const override
Gestionnaire de statistiques associé (peut être nul)
Ref< ISerializeMessageList > createSerializeMessageListRef() final
Créé une liste pour gérer les 'ISerializeMessage'.
UniqueArray< Integer > waitSomeRequests(ArrayView< Request > requests) override
Bloque en attendant qu'une des requêtes rvalues soit terminée.
void broadcastMemoryBuffer(ByteArray &bytes, Int32 rank) override
Effectue un broadcast d'une zone mémoire.
Classe gérant un vecteur de réel de dimension 2.
Classe gérant une matrice de réel de dimension 2x2.
Classe gérant un vecteur de réel de dimension 3.
Classe gérant une matrice de réel de dimension 3x3.
Positionne la phase de l'action en cours d'exécution.
Emulation de réel en précision arbitraire.
Vue modifiable d'un tableau d'un type T.
void fill(const T &o) noexcept
Remplit le tableau avec la valeur o.
constexpr Integer size() const noexcept
Retourne la taille du tableau.
Vue constante d'un tableau de type T.
Interface d'un sérialiseur.
Interface gérant les statistiques sur l'exécution.
Interface du conteneur des dispatchers.
Manage control streams for parallel messages.
Interface du gestionnaire des échanges de messages.
Interface d'un profiler pour les échanges de messages.
Interface des messages de sérialisation.
Interface d'une liste de messages de sérialisation.
Interface d'un message de sérialisation entre IMessagePassingMng.
Gestionnaire des échanges de messages.
Informations sur la source d'un message.
Informations pour envoyer/recevoir un message point à point.
Exception lorsqu'une fonction n'est pas implémentée.
Référence à une instance.
constexpr ARCCORE_HOST_DEVICE SizeType size() const noexcept
Retourne la taille du tableau.
Vue d'un tableau d'éléments de type T.
Chaîne de caractères unicode.
Span< const Byte > bytes() const
Retourne la conversion de l'instance dans l'encodage UTF-8.
Sentinelle pour collecter les informations temporelles.
Vecteur 1D de données avec sémantique par valeur (style STL).
RunQueue makeQueue(const Runner &runner)
Créé une file associée à runner.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
ArrayView< Int64 > Int64ArrayView
Equivalent C d'un tableau à une dimension d'entiers 64 bits.
UniqueArray< Byte > ByteUniqueArray
Tableau dynamique à une dimension de caractères.
Espace de nommage contenant les types et déclarations qui gèrent le mécanisme de parallélisme par éch...
void mpAllGather(IMessagePassingMng *pm, const ISerializer *send_serializer, ISerializer *receive_serialize)
Message allGather() pour une sérialisation.
Espace de nom de Arccore.
Int32 Integer
Type représentant un entier.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int64_t Int64
Type entier signé sur 64 bits.
std::int32_t Int32
Type entier signé sur 32 bits.