14#include "arcane/utils/Array.h"
15#include "arcane/utils/HPReal.h"
16#include "arcane/utils/NumericTypes.h"
17#include "arcane/utils/NotImplementedException.h"
18#include "arcane/utils/ScopedPtr.h"
19#include "arcane/utils/ITraceMng.h"
20#include "arcane/utils/ValueConvert.h"
21#include "arcane/utils/FatalErrorException.h"
23#include "arcane/core/ParallelMngDispatcher.h"
24#include "arcane/core/IParallelDispatch.h"
25#include "arcane/core/Timer.h"
26#include "arcane/core/ITimeStats.h"
27#include "arcane/core/IParallelNonBlockingCollective.h"
28#include "arcane/core/internal/IParallelMngInternal.h"
30#include "arcane/accelerator/core/Runner.h"
31#include "arcane/accelerator/core/RunQueueBuildInfo.h"
33#include "arccore/message_passing/Dispatchers.h"
35#include "arccore/message_passing/MessagePassingMng.h"
36#include "arccore/message_passing/IControlDispatcher.h"
37#include "arccore/message_passing/ISerializeDispatcher.h"
38#include "arccore/message_passing/PointToPointMessageInfo.h"
39#include "arccore/message_passing/IRequestList.h"
40#include "arccore/message_passing/ISerializeMessageList.h"
41#include "arccore/trace/TimeMetric.h"
48using namespace Arccore::MessagePassing;
53ParallelMngDispatcherBuildInfo::
56: m_comm_rank(mpm->commRank())
57, m_comm_size(mpm->commSize())
58, m_dispatchers(dispatchers)
59, m_message_passing_mng(mpm)
67ParallelMngDispatcherBuildInfo::
70: m_comm_rank(mpm_ref->commRank())
71, m_comm_size(mpm_ref->commSize())
72, m_dispatchers(dispatchers.get())
73, m_dispatchers_ref(dispatchers)
74, m_message_passing_mng(mpm_ref.get())
75, m_message_passing_mng_ref(mpm_ref)
83ParallelMngDispatcherBuildInfo::
84ParallelMngDispatcherBuildInfo(Int32 comm_rank,Int32 comm_size)
85: m_comm_rank(comm_rank)
86, m_comm_size(comm_size)
87, m_dispatchers(nullptr)
88, m_message_passing_mng(nullptr)
96void ParallelMngDispatcherBuildInfo::
101 m_dispatchers = m_dispatchers_ref.get();
103 if (!m_message_passing_mng){
104 auto* x =
new MP::MessagePassingMng(m_comm_rank,m_comm_size,m_dispatchers);
105 m_message_passing_mng = x;
106 m_message_passing_mng_ref =
makeRef(x);
108 if (!m_message_passing_mng_ref.get())
109 m_message_passing_mng_ref =
makeRef(m_message_passing_mng);
115ParallelMngDispatcher::DefaultControlDispatcher::
116DefaultControlDispatcher(IParallelMng* pm)
122void ParallelMngDispatcher::DefaultControlDispatcher::
125 m_parallel_mng->waitAllRequests(requests);
128void ParallelMngDispatcher::DefaultControlDispatcher::
130 bool is_non_blocking)
134 done_requests = m_parallel_mng->testSomeRequests(requests);
136 done_requests = m_parallel_mng->waitSomeRequests(requests);
138 for(
int x : done_requests )
149void ParallelMngDispatcher::DefaultControlDispatcher::
152 m_parallel_mng->barrier();
155Request ParallelMngDispatcher::DefaultControlDispatcher::
158 return m_parallel_mng->nonBlockingCollective()->barrier();
161MessageId ParallelMngDispatcher::DefaultControlDispatcher::
164 return m_parallel_mng->probe(message);
170 return m_parallel_mng->legacyProbe(message);
174createRequestListRef()
176 return m_parallel_mng->createRequestListRef();
182void ParallelMngDispatcher::DefaultControlDispatcher::
183setProfiler(MP::IProfiler* p)
193:
public MP::ISerializeDispatcher
204 return m_parallel_mng->createSerializeMessageListRef();
208 return m_parallel_mng->sendSerializer(s,message);
212 return m_parallel_mng->receiveSerializer(s,message);
222:
public IParallelMngInternal
226 explicit Impl(ParallelMngDispatcher* pm)
232 m_is_accelerator_aware_disabled = (v.value()!=0);
239 Runner runner()
const override {
return m_runner; }
240 RunQueue queue()
const override {
return m_queue; }
241 bool isAcceleratorAware()
const override
243 if (m_is_accelerator_aware_disabled)
245 if (m_queue.isNull())
247 if (!m_queue.isAcceleratorPolicy())
249 return m_parallel_mng->_isAcceleratorAware();
251 void setDefaultRunner(
const Runner& runner)
override
253 if (!m_runner.isInitialized())
263 m_queue =
makeQueue(m_runner,build_info);
264 m_queue.setAsync(
true);
268 return m_parallel_mng->_createSubParallelMngRef(color, key);
273 ParallelMngDispatcher* m_parallel_mng =
nullptr;
276 bool m_is_accelerator_aware_disabled =
false;
285ParallelMngDispatcher::
288, m_unsigned_char(nullptr)
289, m_signed_char(nullptr)
291, m_unsigned_short(nullptr)
293, m_unsigned_int(nullptr)
295, m_unsigned_long(nullptr)
296, m_long_long(nullptr)
297, m_unsigned_long_long(nullptr)
300, m_long_double(nullptr)
307, m_time_stats(nullptr)
308, m_mp_dispatchers_ref(bi.dispatchersRef())
309, m_message_passing_mng_ref(bi.messagePassingMngRef())
312, m_parallel_mng_internal(new
Impl(this))
319ParallelMngDispatcher::
320~ParallelMngDispatcher()
322 m_mp_dispatchers_ref.
reset();
324 delete m_parallel_mng_internal;
326 delete m_serialize_dispatcher;
327 delete m_control_dispatcher;
329 delete m_signed_char;
330 delete m_unsigned_char;
332 delete m_unsigned_short;
334 delete m_unsigned_int;
336 delete m_unsigned_long;
338 delete m_unsigned_long_long;
342 delete m_long_double;
353void ParallelMngDispatcher::
354_setControlDispatcher(MP::IControlDispatcher* d)
356 delete m_control_dispatcher;
357 m_control_dispatcher = d;
363void ParallelMngDispatcher::
364_setSerializeDispatcher(MP::ISerializeDispatcher* d)
366 delete m_serialize_dispatcher;
367 m_serialize_dispatcher = d;
373void ParallelMngDispatcher::
374_setArccoreDispatchers()
376 m_mp_dispatchers_ref->setDispatcher(m_char->toArccoreDispatcher());
377 m_mp_dispatchers_ref->setDispatcher(m_signed_char->toArccoreDispatcher());
378 m_mp_dispatchers_ref->setDispatcher(m_unsigned_char->toArccoreDispatcher());
379 m_mp_dispatchers_ref->setDispatcher(m_short->toArccoreDispatcher());
380 m_mp_dispatchers_ref->setDispatcher(m_unsigned_short->toArccoreDispatcher());
381 m_mp_dispatchers_ref->setDispatcher(m_int->toArccoreDispatcher());
382 m_mp_dispatchers_ref->setDispatcher(m_unsigned_int->toArccoreDispatcher());
383 m_mp_dispatchers_ref->setDispatcher(m_long->toArccoreDispatcher());
384 m_mp_dispatchers_ref->setDispatcher(m_unsigned_long->toArccoreDispatcher());
385 m_mp_dispatchers_ref->setDispatcher(m_long_long->toArccoreDispatcher());
386 m_mp_dispatchers_ref->setDispatcher(m_unsigned_long_long->toArccoreDispatcher());
387 m_mp_dispatchers_ref->setDispatcher(m_float->toArccoreDispatcher());
388 m_mp_dispatchers_ref->setDispatcher(m_double->toArccoreDispatcher());
389 m_mp_dispatchers_ref->setDispatcher(m_long_double->toArccoreDispatcher());
392 m_mp_dispatchers_ref->setDispatcher(m_control_dispatcher);
394 m_mp_dispatchers_ref->setDispatcher(m_serialize_dispatcher);
403 return m_message_passing_mng_ref.get();
428 _messagePassingMng()->setTimeMetricCollector(c);
436_communicationTimeMetricAction()
const
438 return Timer::phaseAction(
timeStats(),TP_Communication);
444void ParallelMngDispatcher::
445broadcastString(
String& str,Int32 rank)
451 len_info[0] = bytes.
size();
454 broadcast(utf8_array,rank);
459 broadcast(utf8_array,rank);
460 str = String::fromUtf8(utf8_array);
481 broadcast(bytes,rank);
491 mpAllGather(_messagePassingMng(), send_serializer, recv_serializer);
504 for(
Integer i=0; i<nb_message; ++i ){
506 message_list->addMessage(m);
508 message_list->processPendingMessages();
510 message_list->waitMessages(Parallel::WaitAll);
519 return makeRef(_createSerializeMessageList());
528 return _createSerializeMessageList();
537 return _createSubParallelMng(kept_ranks);
546 return makeRef(_createSubParallelMng(kept_ranks));
553_createSubParallelMngRef([[maybe_unused]]
Int32 color, [[maybe_unused]]
Int32 key)
564 Timer::Phase tphase(
timeStats(),TP_Communication);
567 request_list->add(requests);
568 request_list->wait(wait_type);
570 for (
Integer i=0; i<nb_request; ++i )
571 requests[i] = request_list->request(i);
572 return request_list->doneRequestIndexes();
596#define ARCANE_PARALLEL_MANAGER_DISPATCH(field,type)\
597void ParallelMngDispatcher::\
598allGather(ConstArrayView<type> send_buf,ArrayView<type> recv_buf)\
600 Timer::Phase tphase(timeStats(),TP_Communication);\
601 field->allGather(send_buf,recv_buf);\
603void ParallelMngDispatcher::\
604gather(ConstArrayView<type> send_buf,ArrayView<type> recv_buf,Integer rank) \
606 Timer::Phase tphase(timeStats(),TP_Communication);\
607 field->gather(send_buf,recv_buf,rank); \
609void ParallelMngDispatcher::\
610allGatherVariable(ConstArrayView<type> send_buf,Array<type>& recv_buf)\
612 Timer::Phase tphase(timeStats(),TP_Communication);\
613 field->allGatherVariable(send_buf,recv_buf);\
615void ParallelMngDispatcher::\
616gatherVariable(ConstArrayView<type> send_buf,Array<type>& recv_buf,Integer rank) \
618 Timer::Phase tphase(timeStats(),TP_Communication);\
619 field->gatherVariable(send_buf,recv_buf,rank); \
621void ParallelMngDispatcher::\
622scatterVariable(ConstArrayView<type> send_buf,ArrayView<type> recv_buf,Integer root)\
624 Timer::Phase tphase(timeStats(),TP_Communication);\
625 field->scatterVariable(send_buf,recv_buf,root);\
627type ParallelMngDispatcher::\
628reduce(eReduceType rt,type v)\
630 Timer::Phase tphase(timeStats(),TP_Communication);\
631 return field->allReduce(rt,v);\
633void ParallelMngDispatcher::\
634reduce(eReduceType rt,ArrayView<type> v)\
636 Timer::Phase tphase(timeStats(),TP_Communication);\
637 field->allReduce(rt,v);\
639void ParallelMngDispatcher::\
640broadcast(ArrayView<type> send_buf,Integer id)\
642 Timer::Phase tphase(timeStats(),TP_Communication);\
643 field->broadcast(send_buf,id);\
645void ParallelMngDispatcher::\
646send(ConstArrayView<type> values,Integer id)\
648 Timer::Phase tphase(timeStats(),TP_Communication);\
649 field->send(values,id);\
651void ParallelMngDispatcher::\
652recv(ArrayView<type> values,Integer id)\
654 Timer::Phase tphase(timeStats(),TP_Communication);\
655 field->recv(values,id);\
657Parallel::Request ParallelMngDispatcher::\
658send(ConstArrayView<type> values,Integer id,bool is_blocked)\
660 Timer::Phase tphase(timeStats(),TP_Communication);\
661 return field->send(values,id,is_blocked);\
663Request ParallelMngDispatcher::\
664send(Span<const type> values,const PointToPointMessageInfo& message) \
666 Timer::Phase tphase(timeStats(),TP_Communication);\
667 return field->send(values,message);\
669Parallel::Request ParallelMngDispatcher::\
670recv(ArrayView<type> values,Integer id,bool is_blocked)\
672 Timer::Phase tphase(timeStats(),TP_Communication);\
673 return field->recv(values,id,is_blocked);\
675Request ParallelMngDispatcher::\
676receive(Span<type> values,const PointToPointMessageInfo& message) \
678 Timer::Phase tphase(timeStats(),TP_Communication);\
679 return field->receive(values,message);\
681void ParallelMngDispatcher::\
682sendRecv(ConstArrayView<type> send_buf,ArrayView<type> recv_buf,Integer id)\
684 Timer::Phase tphase(timeStats(),TP_Communication);\
685 field->sendRecv(send_buf,recv_buf,id);\
687void ParallelMngDispatcher::\
688allToAll(ConstArrayView<type> send_buf,ArrayView<type> recv_buf,Integer count)\
690 Timer::Phase tphase(timeStats(),TP_Communication);\
691 field->allToAll(send_buf,recv_buf,count);\
693void ParallelMngDispatcher::\
694allToAllVariable(ConstArrayView<type> send_buf,Int32ConstArrayView send_count,\
695 Int32ConstArrayView send_index,ArrayView<type> recv_buf,\
696 Int32ConstArrayView recv_count,Int32ConstArrayView recv_index)\
698 Timer::Phase tphase(timeStats(),TP_Communication);\
699 field->allToAllVariable(send_buf,send_count,send_index,recv_buf,recv_count,recv_index);\
701type ParallelMngDispatcher::\
702scan(eReduceType rt,type v)\
704 Timer::Phase tphase(timeStats(),TP_Communication);\
705 return field->scan(rt,v);\
707void ParallelMngDispatcher::\
708computeMinMaxSum(type val,type& min_val,type& max_val,type& sum_val,Int32& min_proc,Int32& max_proc)\
710 Timer::Phase tphase(timeStats(),TP_Communication);\
711 field->computeMinMaxSum(val,min_val,max_val,sum_val,min_proc,max_proc);\
713IParallelDispatchT<type>* ParallelMngDispatcher::\
718void ParallelMngDispatcher::\
719computeMinMaxSum(ConstArrayView<type> values, \
720 ArrayView<type> min_values, \
721 ArrayView<type> max_values, \
722 ArrayView<type> sum_values, \
723 ArrayView<Int32> min_ranks, \
724 ArrayView<Int32> max_ranks) \
726 Timer::Phase tphase(timeStats(),TP_Communication);\
727 field->computeMinMaxSum(values,min_values,max_values,sum_values,min_ranks,max_ranks);\
729void ParallelMngDispatcher::\
730scan(eReduceType rt,ArrayView<type> v)\
732 Timer::Phase tphase(timeStats(),TP_Communication);\
736ARCANE_PARALLEL_MANAGER_DISPATCH(m_char,
char)
737ARCANE_PARALLEL_MANAGER_DISPATCH(m_unsigned_char,
unsigned char)
738ARCANE_PARALLEL_MANAGER_DISPATCH(m_signed_char,
signed char)
739ARCANE_PARALLEL_MANAGER_DISPATCH(m_short,
short)
740ARCANE_PARALLEL_MANAGER_DISPATCH(m_unsigned_short,
unsigned short)
741ARCANE_PARALLEL_MANAGER_DISPATCH(m_int,
int)
742ARCANE_PARALLEL_MANAGER_DISPATCH(m_unsigned_int,
unsigned int)
743ARCANE_PARALLEL_MANAGER_DISPATCH(m_long,
long)
744ARCANE_PARALLEL_MANAGER_DISPATCH(m_unsigned_long,
unsigned long)
745ARCANE_PARALLEL_MANAGER_DISPATCH(m_long_long,
long long)
746ARCANE_PARALLEL_MANAGER_DISPATCH(m_unsigned_long_long,
unsigned long long)
747ARCANE_PARALLEL_MANAGER_DISPATCH(m_float,
float)
748ARCANE_PARALLEL_MANAGER_DISPATCH(m_double,
double)
749ARCANE_PARALLEL_MANAGER_DISPATCH(m_long_double,
long double)
750ARCANE_PARALLEL_MANAGER_DISPATCH(m_apreal,
APReal)
751ARCANE_PARALLEL_MANAGER_DISPATCH(m_real2,
Real2)
752ARCANE_PARALLEL_MANAGER_DISPATCH(m_real3,
Real3)
753ARCANE_PARALLEL_MANAGER_DISPATCH(m_real2x2,
Real2x2)
754ARCANE_PARALLEL_MANAGER_DISPATCH(m_real3x3,
Real3x3)
755ARCANE_PARALLEL_MANAGER_DISPATCH(m_hpreal,
HPReal)
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Liste des fonctions d'échange de message.
#define ARCCORE_DEFINE_REFERENCE_COUNTED_CLASS(class_name)
Macro pour définir les méthodes et types une classe qui utilise un compteur de référence.
Emulation de réel en précision arbitraire.
Int64 largeSize() const
Nombre d'éléments du vecteur (en 64 bits)
Informations pour créer une RunQueue.
File d'exécution pour un accélérateur.
Gestionnaire d'exécution pour accélérateur.
Vue modifiable d'un tableau d'un type T.
void fill(const T &o) noexcept
Remplit le tableau avec la valeur o.
constexpr Integer size() const noexcept
Retourne la taille du tableau.
void resize(Int64 s)
Change le nombre d'éléments du tableau à s.
Vue constante d'un tableau de type T.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
Classe implémentant un réel Haute Précision.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual Int32 commRank() const =0
Rang de cette instance dans le communicateur.
virtual Ref< Parallel::IRequestList > createRequestListRef()=0
Créé une liste de requêtes pour ce gestionnaire.
Interface d'un sérialiseur.
Interface gérant les statistiques sur l'exécution.
virtual ITimeMetricCollector * metricCollector()=0
Interface de collection associée.
Interface du conteneur des dispatchers.
Interface du gestionnaire des échanges de messages.
Interface d'une liste de messages de sérialisation.
Gestionnaire des échanges de messages.
Informations sur la source d'un message.
Informations pour envoyer/recevoir un message point à point.
Implémentation de Arccore::MessagePassing::IControlDispatcher.
Request sendSerializer(const ISerializer *s, const PointToPointMessageInfo &message) override
Message d'envoi.
Request receiveSerializer(ISerializer *s, const PointToPointMessageInfo &message) override
Message de réception.
Ref< ISerializeMessageList > createSerializeMessageListRef() override
Créé une liste de messages de sérialisation.
ISerializeMessageList * createSerializeMessageList() final
Créé une liste pour gérer les 'ISerializeMessage'.
void setTimeStats(ITimeStats *ts) override
Positionne le gestionnaire de statistiques.
void allGather(ISerializer *send_serializer, ISerializer *recv_serializer) override
Redéfinit ici allGather pour éviter de cacher le symbole dans les classes dérivées.
Ref< IParallelMng > createSubParallelMngRef(Int32ConstArrayView kept_ranks) override
Créé un nouveau gestionnaire de parallélisme pour un sous-ensemble des rangs.
void processMessages(ConstArrayView< ISerializeMessage * > messages) override
Exécute les opérations des messages messages.
IMessagePassingMng * messagePassingMng() const override
Gestionnaire de message de Arccore associé
ITimeMetricCollector * timeMetricCollector() const override
Collecteur Arccore des statistiques temporelles (peut être nul)
IParallelMng * createSubParallelMng(Int32ConstArrayView kept_ranks) final
Créé un nouveau gestionnaire de parallélisme pour un sous-ensemble des rangs.
UniqueArray< Integer > testSomeRequests(ArrayView< Request > requests) override
Test si une des requêtes rvalues est terminée.
ITimeStats * timeStats() const override
Gestionnaire de statistiques associé (peut être nul)
Ref< ISerializeMessageList > createSerializeMessageListRef() final
Créé une liste pour gérer les 'ISerializeMessage'.
UniqueArray< Integer > waitSomeRequests(ArrayView< Request > requests) override
Bloque en attendant qu'une des requêtes rvalues soit terminée.
void broadcastMemoryBuffer(ByteArray &bytes, Int32 rank) override
Effectue un broadcast d'une zone mémoire.
Classe gérant un vecteur de réel de dimension 2.
Classe gérant une matrice de réel de dimension 2x2.
Classe gérant un vecteur de réel de dimension 3.
Classe gérant une matrice de réel de dimension 3x3.
Référence à une instance.
Encapsulation d'un pointeur qui se détruit automatiquement.
constexpr __host__ __device__ SizeType size() const noexcept
Retourne la taille du tableau.
Vue d'un tableau d'éléments de type T.
Chaîne de caractères unicode.
Span< const Byte > bytes() const
Retourne la conversion de l'instance dans l'encodage UTF-8.
Sentinelle pour collecter les informations temporelles.
Positionne la phase de l'action en cours d'exécution.
Vecteur 1D de données avec sémantique par valeur (style STL).
void reset()
Positionne l'instance au pointeur nul.
RunQueue makeQueue(const Runner &runner)
Créé une file associée à runner.
@ Sequential
Politique d'exécution séquentielle.
@ WaitSome
Attend que tous les messages de la liste soient traités.
void mpAllGather(IMessagePassingMng *pm, const ISerializer *send_serializer, ISerializer *receive_serialize)
Message allGather() pour une sérialisation.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
ArrayView< Int64 > Int64ArrayView
Equivalent C d'un tableau à une dimension d'entiers 64 bits.
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
Array< Byte > ByteArray
Tableau dynamique à une dimension de caractères.
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
UniqueArray< Byte > ByteUniqueArray
Tableau dynamique à une dimension de caractères.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int32_t Int32
Type entier signé sur 32 bits.
Espace de nom de Arccore.
Ref< TrueType > createRef(Args &&... args)
Créé une instance de type TrueType avec les arguments Args et retourne une référence dessus.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.