14#include "arcane/utils/Array.h"
15#include "arcane/utils/HPReal.h"
16#include "arcane/utils/NumericTypes.h"
17#include "arcane/utils/NotImplementedException.h"
18#include "arcane/utils/ScopedPtr.h"
19#include "arcane/utils/ITraceMng.h"
20#include "arcane/utils/ValueConvert.h"
21#include "arcane/utils/FatalErrorException.h"
23#include "arcane/core/ParallelMngDispatcher.h"
24#include "arcane/core/IParallelDispatch.h"
25#include "arcane/core/Timer.h"
26#include "arcane/core/ITimeStats.h"
27#include "arcane/core/IParallelNonBlockingCollective.h"
28#include "arcane/core/internal/IParallelMngInternal.h"
30#include "arcane/accelerator/core/Runner.h"
31#include "arcane/accelerator/core/RunQueueBuildInfo.h"
33#include "arccore/message_passing/Dispatchers.h"
35#include "arccore/message_passing/MessagePassingMng.h"
36#include "arccore/message_passing/IControlDispatcher.h"
37#include "arccore/message_passing/ISerializeDispatcher.h"
38#include "arccore/message_passing/PointToPointMessageInfo.h"
39#include "arccore/message_passing/IRequestList.h"
40#include "arccore/message_passing/ISerializeMessageList.h"
41#include "arccore/trace/TimeMetric.h"
53ParallelMngDispatcherBuildInfo::
56: m_comm_rank(mpm->commRank())
57, m_comm_size(mpm->commSize())
58, m_dispatchers(dispatchers)
59, m_message_passing_mng(mpm)
67ParallelMngDispatcherBuildInfo::
68ParallelMngDispatcherBuildInfo(Ref<MP::Dispatchers> dispatchers,
69 Ref<MP::MessagePassingMng> mpm_ref)
70: m_comm_rank(mpm_ref->commRank())
71, m_comm_size(mpm_ref->commSize())
72, m_dispatchers(dispatchers.get())
73, m_dispatchers_ref(dispatchers)
74, m_message_passing_mng(mpm_ref.get())
75, m_message_passing_mng_ref(mpm_ref)
83ParallelMngDispatcherBuildInfo::
84ParallelMngDispatcherBuildInfo(Int32 comm_rank,Int32 comm_size)
85: m_comm_rank(comm_rank)
86, m_comm_size(comm_size)
87, m_dispatchers(nullptr)
88, m_message_passing_mng(nullptr)
96void ParallelMngDispatcherBuildInfo::
100 m_dispatchers_ref = createRef<MP::Dispatchers>();
101 m_dispatchers = m_dispatchers_ref.get();
103 if (!m_message_passing_mng){
105 m_message_passing_mng = x;
106 m_message_passing_mng_ref = makeRef(x);
108 if (!m_message_passing_mng_ref.get())
109 m_message_passing_mng_ref = makeRef(m_message_passing_mng);
115ParallelMngDispatcher::DefaultControlDispatcher::
116DefaultControlDispatcher(IParallelMng* pm)
122void ParallelMngDispatcher::DefaultControlDispatcher::
125 m_parallel_mng->waitAllRequests(requests);
128void ParallelMngDispatcher::DefaultControlDispatcher::
130 bool is_non_blocking)
134 done_requests = m_parallel_mng->testSomeRequests(requests);
136 done_requests = m_parallel_mng->waitSomeRequests(requests);
138 for(
int x : done_requests )
149void ParallelMngDispatcher::DefaultControlDispatcher::
152 m_parallel_mng->barrier();
155Request ParallelMngDispatcher::DefaultControlDispatcher::
158 return m_parallel_mng->nonBlockingCollective()->barrier();
161MessageId ParallelMngDispatcher::DefaultControlDispatcher::
164 return m_parallel_mng->probe(message);
170 return m_parallel_mng->legacyProbe(message);
173Ref<Parallel::IRequestList> ParallelMngDispatcher::DefaultControlDispatcher::
174createRequestListRef()
176 return m_parallel_mng->createRequestListRef();
182void ParallelMngDispatcher::DefaultControlDispatcher::
204 return m_parallel_mng->createSerializeMessageListRef();
208 return m_parallel_mng->sendSerializer(s,message);
212 return m_parallel_mng->receiveSerializer(s,message);
222:
public IParallelMngInternal
228 , m_runner(Accelerator::eExecutionPolicy::Sequential)
232 m_is_accelerator_aware_disabled = (v.value()!=0);
239 Runner runner()
const override {
return m_runner; }
240 RunQueue queue()
const override {
return m_queue; }
241 bool isAcceleratorAware()
const override
243 if (m_is_accelerator_aware_disabled)
245 if (m_queue.isNull())
247 if (!m_queue.isAcceleratorPolicy())
249 return m_parallel_mng->_isAcceleratorAware();
251 void setDefaultRunner(
const Runner& runner)
override
253 if (!m_runner.isInitialized())
258 m_queue =
makeQueue(m_runner,build_info);
259 m_queue.setAsync(
true);
261 Ref<IParallelMng> createSubParallelMngRef(
Int32 color,
Int32 key)
override
263 return m_parallel_mng->_createSubParallelMngRef(color, key);
271 bool m_is_accelerator_aware_disabled =
false;
280ParallelMngDispatcher::
283, m_unsigned_char(nullptr)
284, m_signed_char(nullptr)
286, m_unsigned_short(nullptr)
288, m_unsigned_int(nullptr)
290, m_unsigned_long(nullptr)
291, m_long_long(nullptr)
292, m_unsigned_long_long(nullptr)
295, m_long_double(nullptr)
302, m_time_stats(nullptr)
303, m_mp_dispatchers_ref(bi.dispatchersRef())
304, m_message_passing_mng_ref(bi.messagePassingMngRef())
307, m_parallel_mng_internal(new
Impl(this))
314ParallelMngDispatcher::
315~ParallelMngDispatcher()
317 m_mp_dispatchers_ref.reset();
319 delete m_parallel_mng_internal;
321 delete m_serialize_dispatcher;
322 delete m_control_dispatcher;
324 delete m_signed_char;
325 delete m_unsigned_char;
327 delete m_unsigned_short;
329 delete m_unsigned_int;
331 delete m_unsigned_long;
333 delete m_unsigned_long_long;
337 delete m_long_double;
348void ParallelMngDispatcher::
351 delete m_control_dispatcher;
352 m_control_dispatcher = d;
358void ParallelMngDispatcher::
361 delete m_serialize_dispatcher;
362 m_serialize_dispatcher = d;
368void ParallelMngDispatcher::
369_setArccoreDispatchers()
371 m_mp_dispatchers_ref->setDispatcher(m_char->toArccoreDispatcher());
372 m_mp_dispatchers_ref->setDispatcher(m_signed_char->toArccoreDispatcher());
373 m_mp_dispatchers_ref->setDispatcher(m_unsigned_char->toArccoreDispatcher());
374 m_mp_dispatchers_ref->setDispatcher(m_short->toArccoreDispatcher());
375 m_mp_dispatchers_ref->setDispatcher(m_unsigned_short->toArccoreDispatcher());
376 m_mp_dispatchers_ref->setDispatcher(m_int->toArccoreDispatcher());
377 m_mp_dispatchers_ref->setDispatcher(m_unsigned_int->toArccoreDispatcher());
378 m_mp_dispatchers_ref->setDispatcher(m_long->toArccoreDispatcher());
379 m_mp_dispatchers_ref->setDispatcher(m_unsigned_long->toArccoreDispatcher());
380 m_mp_dispatchers_ref->setDispatcher(m_long_long->toArccoreDispatcher());
381 m_mp_dispatchers_ref->setDispatcher(m_unsigned_long_long->toArccoreDispatcher());
382 m_mp_dispatchers_ref->setDispatcher(m_float->toArccoreDispatcher());
383 m_mp_dispatchers_ref->setDispatcher(m_double->toArccoreDispatcher());
384 m_mp_dispatchers_ref->setDispatcher(m_long_double->toArccoreDispatcher());
387 m_mp_dispatchers_ref->setDispatcher(m_control_dispatcher);
389 m_mp_dispatchers_ref->setDispatcher(m_serialize_dispatcher);
398 return m_message_passing_mng_ref.get();
423 _messagePassingMng()->setTimeMetricCollector(c);
431_communicationTimeMetricAction()
const
433 return Timer::phaseAction(
timeStats(),TP_Communication);
439void ParallelMngDispatcher::
440broadcastString(
String& str,Int32 rank)
446 len_info[0] = bytes.
size();
449 broadcast(utf8_array,rank);
454 broadcast(utf8_array,rank);
455 str = String::fromUtf8(utf8_array);
476 broadcast(bytes,rank);
486 mpAllGather(_messagePassingMng(), send_serializer, recv_serializer);
499 for(
Integer i=0; i<nb_message; ++i ){
501 message_list->addMessage(m);
503 message_list->processPendingMessages();
505 message_list->waitMessages(Parallel::WaitAll);
514 return makeRef(_createSerializeMessageList());
523 return _createSerializeMessageList();
532 return _createSubParallelMng(kept_ranks);
541 return makeRef(_createSubParallelMng(kept_ranks));
547Ref<IParallelMng> ParallelMngDispatcher::
548_createSubParallelMngRef([[maybe_unused]]
Int32 color, [[maybe_unused]]
Int32 key)
559 Timer::Phase tphase(
timeStats(),TP_Communication);
562 request_list->add(requests);
563 request_list->wait(wait_type);
565 for (
Integer i=0; i<nb_request; ++i )
566 requests[i] = request_list->request(i);
567 return request_list->doneRequestIndexes();
576 return _doWaitRequests(requests,Parallel::WaitSome);
585 return _doWaitRequests(requests,Parallel::WaitSomeNonBlocking);
591#define ARCANE_PARALLEL_MANAGER_DISPATCH(field,type)\
592void ParallelMngDispatcher::\
593allGather(ConstArrayView<type> send_buf,ArrayView<type> recv_buf)\
595 Timer::Phase tphase(timeStats(),TP_Communication);\
596 field->allGather(send_buf,recv_buf);\
598void ParallelMngDispatcher::\
599gather(ConstArrayView<type> send_buf,ArrayView<type> recv_buf,Integer rank) \
601 Timer::Phase tphase(timeStats(),TP_Communication);\
602 field->gather(send_buf,recv_buf,rank); \
604void ParallelMngDispatcher::\
605allGatherVariable(ConstArrayView<type> send_buf,Array<type>& recv_buf)\
607 Timer::Phase tphase(timeStats(),TP_Communication);\
608 field->allGatherVariable(send_buf,recv_buf);\
610void ParallelMngDispatcher::\
611gatherVariable(ConstArrayView<type> send_buf,Array<type>& recv_buf,Integer rank) \
613 Timer::Phase tphase(timeStats(),TP_Communication);\
614 field->gatherVariable(send_buf,recv_buf,rank); \
616void ParallelMngDispatcher::\
617scatterVariable(ConstArrayView<type> send_buf,ArrayView<type> recv_buf,Integer root)\
619 Timer::Phase tphase(timeStats(),TP_Communication);\
620 field->scatterVariable(send_buf,recv_buf,root);\
622type ParallelMngDispatcher::\
623reduce(eReduceType rt,type v)\
625 Timer::Phase tphase(timeStats(),TP_Communication);\
626 return field->allReduce(rt,v);\
628void ParallelMngDispatcher::\
629reduce(eReduceType rt,ArrayView<type> v)\
631 Timer::Phase tphase(timeStats(),TP_Communication);\
632 field->allReduce(rt,v);\
634void ParallelMngDispatcher::\
635broadcast(ArrayView<type> send_buf,Integer id)\
637 Timer::Phase tphase(timeStats(),TP_Communication);\
638 field->broadcast(send_buf,id);\
640void ParallelMngDispatcher::\
641send(ConstArrayView<type> values,Integer id)\
643 Timer::Phase tphase(timeStats(),TP_Communication);\
644 field->send(values,id);\
646void ParallelMngDispatcher::\
647recv(ArrayView<type> values,Integer id)\
649 Timer::Phase tphase(timeStats(),TP_Communication);\
650 field->recv(values,id);\
652Parallel::Request ParallelMngDispatcher::\
653send(ConstArrayView<type> values,Integer id,bool is_blocked)\
655 Timer::Phase tphase(timeStats(),TP_Communication);\
656 return field->send(values,id,is_blocked);\
658Request ParallelMngDispatcher::\
659send(Span<const type> values,const PointToPointMessageInfo& message) \
661 Timer::Phase tphase(timeStats(),TP_Communication);\
662 return field->send(values,message);\
664Parallel::Request ParallelMngDispatcher::\
665recv(ArrayView<type> values,Integer id,bool is_blocked)\
667 Timer::Phase tphase(timeStats(),TP_Communication);\
668 return field->recv(values,id,is_blocked);\
670Request ParallelMngDispatcher::\
671receive(Span<type> values,const PointToPointMessageInfo& message) \
673 Timer::Phase tphase(timeStats(),TP_Communication);\
674 return field->receive(values,message);\
676void ParallelMngDispatcher::\
677sendRecv(ConstArrayView<type> send_buf,ArrayView<type> recv_buf,Integer id)\
679 Timer::Phase tphase(timeStats(),TP_Communication);\
680 field->sendRecv(send_buf,recv_buf,id);\
682void ParallelMngDispatcher::\
683allToAll(ConstArrayView<type> send_buf,ArrayView<type> recv_buf,Integer count)\
685 Timer::Phase tphase(timeStats(),TP_Communication);\
686 field->allToAll(send_buf,recv_buf,count);\
688void ParallelMngDispatcher::\
689allToAllVariable(ConstArrayView<type> send_buf,Int32ConstArrayView send_count,\
690 Int32ConstArrayView send_index,ArrayView<type> recv_buf,\
691 Int32ConstArrayView recv_count,Int32ConstArrayView recv_index)\
693 Timer::Phase tphase(timeStats(),TP_Communication);\
694 field->allToAllVariable(send_buf,send_count,send_index,recv_buf,recv_count,recv_index);\
696type ParallelMngDispatcher::\
697scan(eReduceType rt,type v)\
699 Timer::Phase tphase(timeStats(),TP_Communication);\
700 return field->scan(rt,v);\
702void ParallelMngDispatcher::\
703computeMinMaxSum(type val,type& min_val,type& max_val,type& sum_val,Int32& min_proc,Int32& max_proc)\
705 Timer::Phase tphase(timeStats(),TP_Communication);\
706 field->computeMinMaxSum(val,min_val,max_val,sum_val,min_proc,max_proc);\
708IParallelDispatchT<type>* ParallelMngDispatcher::\
713void ParallelMngDispatcher::\
714computeMinMaxSum(ConstArrayView<type> values, \
715 ArrayView<type> min_values, \
716 ArrayView<type> max_values, \
717 ArrayView<type> sum_values, \
718 ArrayView<Int32> min_ranks, \
719 ArrayView<Int32> max_ranks) \
721 Timer::Phase tphase(timeStats(),TP_Communication);\
722 field->computeMinMaxSum(values,min_values,max_values,sum_values,min_ranks,max_ranks);\
724void ParallelMngDispatcher::\
725scan(eReduceType rt,ArrayView<type> v)\
727 Timer::Phase tphase(timeStats(),TP_Communication);\
731ARCANE_PARALLEL_MANAGER_DISPATCH(m_char,
char)
732ARCANE_PARALLEL_MANAGER_DISPATCH(m_unsigned_char,
unsigned char)
733ARCANE_PARALLEL_MANAGER_DISPATCH(m_signed_char,
signed char)
734ARCANE_PARALLEL_MANAGER_DISPATCH(m_short,
short)
735ARCANE_PARALLEL_MANAGER_DISPATCH(m_unsigned_short,
unsigned short)
736ARCANE_PARALLEL_MANAGER_DISPATCH(m_int,
int)
737ARCANE_PARALLEL_MANAGER_DISPATCH(m_unsigned_int,
unsigned int)
738ARCANE_PARALLEL_MANAGER_DISPATCH(m_long,
long)
739ARCANE_PARALLEL_MANAGER_DISPATCH(m_unsigned_long,
unsigned long)
740ARCANE_PARALLEL_MANAGER_DISPATCH(m_long_long,
long long)
741ARCANE_PARALLEL_MANAGER_DISPATCH(m_unsigned_long_long,
unsigned long long)
742ARCANE_PARALLEL_MANAGER_DISPATCH(m_float,
float)
743ARCANE_PARALLEL_MANAGER_DISPATCH(m_double,
double)
744ARCANE_PARALLEL_MANAGER_DISPATCH(m_long_double,
long double)
745ARCANE_PARALLEL_MANAGER_DISPATCH(m_apreal,
APReal)
746ARCANE_PARALLEL_MANAGER_DISPATCH(m_real2,
Real2)
747ARCANE_PARALLEL_MANAGER_DISPATCH(m_real3,
Real3)
748ARCANE_PARALLEL_MANAGER_DISPATCH(m_real2x2,
Real2x2)
749ARCANE_PARALLEL_MANAGER_DISPATCH(m_real3x3,
Real3x3)
750ARCANE_PARALLEL_MANAGER_DISPATCH(m_hpreal,
HPReal)
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Liste des fonctions d'échange de message.
#define ARCCORE_DEFINE_REFERENCE_COUNTED_CLASS(class_name)
Macro pour définir les méthodes et types une classe qui utilise un compteur de référence.
Informations pour créer une RunQueue.
File d'exécution pour un accélérateur.
Gestionnaire d'exécution pour accélérateur.
Classe implémentant un réel Haute Précision.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual Int32 commRank() const =0
Rang de cette instance dans le communicateur.
virtual Ref< Parallel::IRequestList > createRequestListRef()=0
Créé une liste de requêtes pour ce gestionnaire.
virtual ITimeMetricCollector * metricCollector()=0
Interface de collection associée.
Implémentation de Arccore::MessagePassing::IControlDispatcher.
Request sendSerializer(const ISerializer *s, const PointToPointMessageInfo &message) override
Message d'envoi.
Request receiveSerializer(ISerializer *s, const PointToPointMessageInfo &message) override
Message de réception.
Ref< ISerializeMessageList > createSerializeMessageListRef() override
Créé une liste de messages de sérialisation.
ISerializeMessageList * createSerializeMessageList() final
Créé une liste pour gérer les 'ISerializeMessage'.
void setTimeStats(ITimeStats *ts) override
Positionne le gestionnaire de statistiques.
void allGather(ISerializer *send_serializer, ISerializer *recv_serializer) override
Redéfinit ici allGather pour éviter de cacher le symbole dans les classes dérivées.
Ref< IParallelMng > createSubParallelMngRef(Int32ConstArrayView kept_ranks) override
Créé un nouveau gestionnaire de parallélisme pour un sous-ensemble des rangs.
void processMessages(ConstArrayView< ISerializeMessage * > messages) override
Exécute les opérations des messages messages.
IMessagePassingMng * messagePassingMng() const override
Gestionnaire de message de Arccore associé
ITimeMetricCollector * timeMetricCollector() const override
Collecteur Arccore des statistiques temporelles (peut être nul)
IParallelMng * createSubParallelMng(Int32ConstArrayView kept_ranks) final
Créé un nouveau gestionnaire de parallélisme pour un sous-ensemble des rangs.
UniqueArray< Integer > testSomeRequests(ArrayView< Request > requests) override
Test si une des requêtes rvalues est terminée.
ITimeStats * timeStats() const override
Gestionnaire de statistiques associé (peut être nul)
Ref< ISerializeMessageList > createSerializeMessageListRef() final
Créé une liste pour gérer les 'ISerializeMessage'.
UniqueArray< Integer > waitSomeRequests(ArrayView< Request > requests) override
Bloque en attendant qu'une des requêtes rvalues soit terminée.
void broadcastMemoryBuffer(ByteArray &bytes, Int32 rank) override
Effectue un broadcast d'une zone mémoire.
Classe gérant un vecteur de réel de dimension 2.
Classe gérant une matrice de réel de dimension 2x2.
Classe gérant un vecteur de réel de dimension 3.
Classe gérant une matrice de réel de dimension 3x3.
Encapsulation d'un pointeur qui se détruit automatiquement.
Positionne la phase de l'action en cours d'exécution.
Emulation de réel en précision arbitraire.
Int64 largeSize() const
Nombre d'éléments du vecteur (en 64 bits)
Vue modifiable d'un tableau d'un type T.
void fill(const T &o) noexcept
Remplit le tableau avec la valeur o.
constexpr Integer size() const noexcept
Retourne la taille du tableau.
Classe de base des vecteurs 1D de données.
void resize(Int64 s)
Change le nombre d'éléments du tableau à s.
Vue constante d'un tableau de type T.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
Interface d'un sérialiseur.
Interface gérant les statistiques sur l'exécution.
Interface du conteneur des dispatchers.
Interface du gestionnaire des échanges de messages.
Interface d'un profiler pour les échanges de messages.
Interface des messages de sérialisation.
Interface d'une liste de messages de sérialisation.
Gestionnaire des échanges de messages.
Informations sur la source d'un message.
Informations pour envoyer/recevoir un message point à point.
constexpr __host__ __device__ SizeType size() const noexcept
Retourne la taille du tableau.
Vue d'un tableau d'éléments de type T.
Chaîne de caractères unicode.
Span< const Byte > bytes() const
Retourne la conversion de l'instance dans l'encodage UTF-8.
Sentinelle pour collecter les informations temporelles.
Vecteur 1D de données avec sémantique par valeur (style STL).
RunQueue makeQueue(const Runner &runner)
Créé une file associée à runner.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
ArrayView< Int64 > Int64ArrayView
Equivalent C d'un tableau à une dimension d'entiers 64 bits.
UniqueArray< Byte > ByteUniqueArray
Tableau dynamique à une dimension de caractères.
Espace de nommage contenant les types et déclarations qui gèrent le mécanisme de parallélisme par éch...
void mpAllGather(IMessagePassingMng *pm, const ISerializer *send_serializer, ISerializer *receive_serialize)
Message allGather() pour une sérialisation.
Espace de nom de Arccore.
Int32 Integer
Type représentant un entier.
std::int64_t Int64
Type entier signé sur 64 bits.
std::int32_t Int32
Type entier signé sur 32 bits.