14#include "arcane/utils/Array.h"
15#include "arcane/utils/HPReal.h"
16#include "arcane/utils/NumericTypes.h"
17#include "arcane/utils/NotImplementedException.h"
18#include "arcane/utils/ScopedPtr.h"
19#include "arcane/utils/ITraceMng.h"
20#include "arcane/utils/ValueConvert.h"
21#include "arcane/utils/FatalErrorException.h"
23#include "arcane/core/ParallelMngDispatcher.h"
24#include "arcane/core/IParallelDispatch.h"
25#include "arcane/core/Timer.h"
26#include "arcane/core/ITimeStats.h"
27#include "arcane/core/IParallelNonBlockingCollective.h"
28#include "arcane/core/internal/ParallelMngInternal.h"
30#include "arcane/accelerator/core/Runner.h"
31#include "arcane/accelerator/core/RunQueueBuildInfo.h"
33#include "arccore/message_passing/Dispatchers.h"
35#include "arccore/message_passing/MessagePassingMng.h"
36#include "arccore/message_passing/IControlDispatcher.h"
37#include "arccore/message_passing/ISerializeDispatcher.h"
38#include "arccore/message_passing/PointToPointMessageInfo.h"
39#include "arccore/message_passing/IRequestList.h"
40#include "arccore/message_passing/ISerializeMessageList.h"
41#include "arccore/trace/internal/TimeMetric.h"
48using namespace Arccore::MessagePassing;
53ParallelMngDispatcherBuildInfo::
56: m_comm_rank(mpm->commRank())
57, m_comm_size(mpm->commSize())
58, m_dispatchers(dispatchers)
59, m_message_passing_mng(mpm)
67ParallelMngDispatcherBuildInfo::
70: m_comm_rank(mpm_ref->commRank())
71, m_comm_size(mpm_ref->commSize())
72, m_dispatchers(dispatchers.get())
73, m_dispatchers_ref(dispatchers)
74, m_message_passing_mng(mpm_ref.get())
75, m_message_passing_mng_ref(mpm_ref)
83ParallelMngDispatcherBuildInfo::
84ParallelMngDispatcherBuildInfo(Int32 comm_rank, Int32 comm_size)
85: m_comm_rank(comm_rank)
86, m_comm_size(comm_size)
87, m_dispatchers(nullptr)
88, m_message_passing_mng(nullptr)
96void ParallelMngDispatcherBuildInfo::
101 m_dispatchers = m_dispatchers_ref.get();
103 if (!m_message_passing_mng) {
104 auto* x =
new MP::MessagePassingMng(m_comm_rank, m_comm_size, m_dispatchers);
105 m_message_passing_mng = x;
106 m_message_passing_mng_ref =
makeRef(x);
108 if (!m_message_passing_mng_ref.get())
109 m_message_passing_mng_ref =
makeRef(m_message_passing_mng);
115ParallelMngDispatcher::DefaultControlDispatcher::
116DefaultControlDispatcher(IParallelMng* pm)
121void ParallelMngDispatcher::DefaultControlDispatcher::
124 m_parallel_mng->waitAllRequests(requests);
127void ParallelMngDispatcher::DefaultControlDispatcher::
129 bool is_non_blocking)
133 done_requests = m_parallel_mng->testSomeRequests(requests);
135 done_requests = m_parallel_mng->waitSomeRequests(requests);
137 for (
int x : done_requests)
148void ParallelMngDispatcher::DefaultControlDispatcher::
151 m_parallel_mng->barrier();
154Request ParallelMngDispatcher::DefaultControlDispatcher::
157 return m_parallel_mng->nonBlockingCollective()->barrier();
160MessageId ParallelMngDispatcher::DefaultControlDispatcher::
163 return m_parallel_mng->probe(message);
169 return m_parallel_mng->legacyProbe(message);
173createRequestListRef()
175 return m_parallel_mng->createRequestListRef();
181void ParallelMngDispatcher::DefaultControlDispatcher::
182setProfiler(MP::IProfiler* p)
192:
public MP::ISerializeDispatcher
204 return m_parallel_mng->createSerializeMessageListRef();
208 return m_parallel_mng->sendSerializer(s, message);
212 return m_parallel_mng->receiveSerializer(s, message);
226ParallelMngDispatcher::
229, m_unsigned_char(nullptr)
230, m_signed_char(nullptr)
232, m_unsigned_short(nullptr)
234, m_unsigned_int(nullptr)
236, m_unsigned_long(nullptr)
237, m_long_long(nullptr)
238, m_unsigned_long_long(nullptr)
241, m_long_double(nullptr)
248, m_time_stats(nullptr)
249, m_mp_dispatchers_ref(bi.dispatchersRef())
250, m_message_passing_mng_ref(bi.messagePassingMngRef())
251, m_control_dispatcher(new DefaultControlDispatcher(this))
252, m_serialize_dispatcher(new SerializeDispatcher(this))
253, m_parallel_mng_internal(new ParallelMngInternal(this))
260ParallelMngDispatcher::
261~ParallelMngDispatcher()
263 m_mp_dispatchers_ref.
reset();
265 delete m_parallel_mng_internal;
267 delete m_serialize_dispatcher;
268 delete m_control_dispatcher;
270 delete m_signed_char;
271 delete m_unsigned_char;
273 delete m_unsigned_short;
275 delete m_unsigned_int;
277 delete m_unsigned_long;
279 delete m_unsigned_long_long;
283 delete m_long_double;
294void ParallelMngDispatcher::
295_setControlDispatcher(MP::IControlDispatcher* d)
297 delete m_control_dispatcher;
298 m_control_dispatcher = d;
304void ParallelMngDispatcher::
305_setSerializeDispatcher(MP::ISerializeDispatcher* d)
307 delete m_serialize_dispatcher;
308 m_serialize_dispatcher = d;
314void ParallelMngDispatcher::
315_setArccoreDispatchers()
317 m_mp_dispatchers_ref->setDispatcher(m_char->toArccoreDispatcher());
318 m_mp_dispatchers_ref->setDispatcher(m_signed_char->toArccoreDispatcher());
319 m_mp_dispatchers_ref->setDispatcher(m_unsigned_char->toArccoreDispatcher());
320 m_mp_dispatchers_ref->setDispatcher(m_short->toArccoreDispatcher());
321 m_mp_dispatchers_ref->setDispatcher(m_unsigned_short->toArccoreDispatcher());
322 m_mp_dispatchers_ref->setDispatcher(m_int->toArccoreDispatcher());
323 m_mp_dispatchers_ref->setDispatcher(m_unsigned_int->toArccoreDispatcher());
324 m_mp_dispatchers_ref->setDispatcher(m_long->toArccoreDispatcher());
325 m_mp_dispatchers_ref->setDispatcher(m_unsigned_long->toArccoreDispatcher());
326 m_mp_dispatchers_ref->setDispatcher(m_long_long->toArccoreDispatcher());
327 m_mp_dispatchers_ref->setDispatcher(m_unsigned_long_long->toArccoreDispatcher());
328 m_mp_dispatchers_ref->setDispatcher(m_float->toArccoreDispatcher());
329 m_mp_dispatchers_ref->setDispatcher(m_double->toArccoreDispatcher());
330 m_mp_dispatchers_ref->setDispatcher(m_long_double->toArccoreDispatcher());
333 m_mp_dispatchers_ref->setDispatcher(m_control_dispatcher);
335 m_mp_dispatchers_ref->setDispatcher(m_serialize_dispatcher);
344 return m_message_passing_mng_ref.get();
353 ITimeMetricCollector* c =
nullptr;
369 _messagePassingMng()->setTimeMetricCollector(c);
376TimeMetricAction ParallelMngDispatcher::
377_communicationTimeMetricAction()
const
379 return Timer::phaseAction(
timeStats(), TP_Communication);
385void ParallelMngDispatcher::
386broadcastString(
String& str, Int32 rank)
391 if (rank == my_rank) {
392 len_info[0] = bytes.
size();
395 broadcast(utf8_array, rank);
400 broadcast(utf8_array, rank);
401 str = String::fromUtf8(utf8_array);
422 broadcast(bytes, rank);
432 mpAllGather(_messagePassingMng(), send_serializer, recv_serializer);
441 TimeMetricSentry tphase(Timer::phaseAction(
timeStats(), TP_Communication));
445 message_list->addMessage(m);
447 message_list->waitMessages(Parallel::WaitAll);
456 TimeMetricSentry tphase(Timer::phaseAction(
timeStats(), TP_Communication));
460 message_list->addMessage(v.get());
462 message_list->waitMessages(Parallel::WaitAll);
471 return makeRef(_createSerializeMessageList());
480 return _createSerializeMessageList();
489 return _createSubParallelMng(kept_ranks);
498 return makeRef(_createSubParallelMng(kept_ranks));
505_createSubParallelMngRef([[maybe_unused]]
Int32 color, [[maybe_unused]]
Int32 key)
516 Timer::Phase tphase(
timeStats(), TP_Communication);
519 request_list->add(requests);
520 request_list->wait(wait_type);
522 for (
Integer i = 0; i < nb_request; ++i)
523 requests[i] = request_list->request(i);
524 return request_list->doneRequestIndexes();
548#define ARCANE_PARALLEL_MANAGER_DISPATCH(field, type) \
549 void ParallelMngDispatcher:: \
550 allGather(ConstArrayView<type> send_buf, ArrayView<type> recv_buf) \
552 Timer::Phase tphase(timeStats(), TP_Communication); \
553 (field)->allGather(send_buf, recv_buf); \
555 void ParallelMngDispatcher:: \
556 gather(ConstArrayView<type> send_buf, ArrayView<type> recv_buf, Integer rank) \
558 Timer::Phase tphase(timeStats(), TP_Communication); \
559 (field)->gather(send_buf, recv_buf, rank); \
561 void ParallelMngDispatcher:: \
562 allGatherVariable(ConstArrayView<type> send_buf, Array<type>& recv_buf) \
564 Timer::Phase tphase(timeStats(), TP_Communication); \
565 (field)->allGatherVariable(send_buf, recv_buf); \
567 void ParallelMngDispatcher:: \
568 gatherVariable(ConstArrayView<type> send_buf, Array<type>& recv_buf, Integer rank) \
570 Timer::Phase tphase(timeStats(), TP_Communication); \
571 (field)->gatherVariable(send_buf, recv_buf, rank); \
573 void ParallelMngDispatcher:: \
574 scatterVariable(ConstArrayView<type> send_buf, ArrayView<type> recv_buf, Integer root) \
576 Timer::Phase tphase(timeStats(), TP_Communication); \
577 (field)->scatterVariable(send_buf, recv_buf, root); \
579 type ParallelMngDispatcher:: \
580 reduce(eReduceType rt, type v) \
582 Timer::Phase tphase(timeStats(), TP_Communication); \
583 return (field)->allReduce(rt, v); \
585 void ParallelMngDispatcher:: \
586 reduce(eReduceType rt, ArrayView<type> v) \
588 Timer::Phase tphase(timeStats(), TP_Communication); \
589 (field)->allReduce(rt, v); \
591 void ParallelMngDispatcher:: \
592 broadcast(ArrayView<type> send_buf, Integer id) \
594 Timer::Phase tphase(timeStats(), TP_Communication); \
595 (field)->broadcast(send_buf, id); \
597 void ParallelMngDispatcher:: \
598 send(ConstArrayView<type> values, Integer id) \
600 Timer::Phase tphase(timeStats(), TP_Communication); \
601 (field)->send(values, id); \
603 void ParallelMngDispatcher:: \
604 recv(ArrayView<type> values, Integer id) \
606 Timer::Phase tphase(timeStats(), TP_Communication); \
607 (field)->recv(values, id); \
609 Parallel::Request ParallelMngDispatcher:: \
610 send(ConstArrayView<type> values, Integer id, bool is_blocked) \
612 Timer::Phase tphase(timeStats(), TP_Communication); \
613 return (field)->send(values, id, is_blocked); \
615 Request ParallelMngDispatcher:: \
616 send(Span<const type> values, const PointToPointMessageInfo& message) \
618 Timer::Phase tphase(timeStats(), TP_Communication); \
619 return (field)->send(values, message); \
621 Parallel::Request ParallelMngDispatcher:: \
622 recv(ArrayView<type> values, Integer id, bool is_blocked) \
624 Timer::Phase tphase(timeStats(), TP_Communication); \
625 return (field)->recv(values, id, is_blocked); \
627 Request ParallelMngDispatcher:: \
628 receive(Span<type> values, const PointToPointMessageInfo& message) \
630 Timer::Phase tphase(timeStats(), TP_Communication); \
631 return (field)->receive(values, message); \
633 void ParallelMngDispatcher:: \
634 sendRecv(ConstArrayView<type> send_buf, ArrayView<type> recv_buf, Integer id) \
636 Timer::Phase tphase(timeStats(), TP_Communication); \
637 (field)->sendRecv(send_buf, recv_buf, id); \
639 void ParallelMngDispatcher:: \
640 allToAll(ConstArrayView<type> send_buf, ArrayView<type> recv_buf, Integer count) \
642 Timer::Phase tphase(timeStats(), TP_Communication); \
643 (field)->allToAll(send_buf, recv_buf, count); \
645 void ParallelMngDispatcher:: \
646 allToAllVariable(ConstArrayView<type> send_buf, Int32ConstArrayView send_count, \
647 Int32ConstArrayView send_index, ArrayView<type> recv_buf, \
648 Int32ConstArrayView recv_count, Int32ConstArrayView recv_index) \
650 Timer::Phase tphase(timeStats(), TP_Communication); \
651 (field)->allToAllVariable(send_buf, send_count, send_index, recv_buf, recv_count, recv_index); \
653 type ParallelMngDispatcher:: \
654 scan(eReduceType rt, type v) \
656 Timer::Phase tphase(timeStats(), TP_Communication); \
657 return (field)->scan(rt, v); \
659 void ParallelMngDispatcher:: \
660 computeMinMaxSum(type val, type& min_val, type& max_val, type& sum_val, Int32& min_proc, Int32& max_proc) \
662 Timer::Phase tphase(timeStats(), TP_Communication); \
663 (field)->computeMinMaxSum(val, min_val, max_val, sum_val, min_proc, max_proc); \
665 IParallelDispatchT<type>* ParallelMngDispatcher:: \
670 void ParallelMngDispatcher:: \
671 computeMinMaxSum(ConstArrayView<type> values, \
672 ArrayView<type> min_values, \
673 ArrayView<type> max_values, \
674 ArrayView<type> sum_values, \
675 ArrayView<Int32> min_ranks, \
676 ArrayView<Int32> max_ranks) \
678 Timer::Phase tphase(timeStats(), TP_Communication); \
679 (field)->computeMinMaxSum(values, min_values, max_values, sum_values, min_ranks, max_ranks); \
681 void ParallelMngDispatcher:: \
682 scan(eReduceType rt, ArrayView<type> v) \
684 Timer::Phase tphase(timeStats(), TP_Communication); \
685 (field)->scan(rt, v); \
688ARCANE_PARALLEL_MANAGER_DISPATCH(m_char,
char)
689ARCANE_PARALLEL_MANAGER_DISPATCH(m_unsigned_char,
unsigned char)
690ARCANE_PARALLEL_MANAGER_DISPATCH(m_signed_char,
signed char)
691ARCANE_PARALLEL_MANAGER_DISPATCH(m_short,
short)
692ARCANE_PARALLEL_MANAGER_DISPATCH(m_unsigned_short,
unsigned short)
693ARCANE_PARALLEL_MANAGER_DISPATCH(m_int,
int)
694ARCANE_PARALLEL_MANAGER_DISPATCH(m_unsigned_int,
unsigned int)
695ARCANE_PARALLEL_MANAGER_DISPATCH(m_long,
long)
696ARCANE_PARALLEL_MANAGER_DISPATCH(m_unsigned_long,
unsigned long)
697ARCANE_PARALLEL_MANAGER_DISPATCH(m_long_long,
long long)
698ARCANE_PARALLEL_MANAGER_DISPATCH(m_unsigned_long_long,
unsigned long long)
699ARCANE_PARALLEL_MANAGER_DISPATCH(m_float,
float)
700ARCANE_PARALLEL_MANAGER_DISPATCH(m_double,
double)
701ARCANE_PARALLEL_MANAGER_DISPATCH(m_long_double,
long double)
702ARCANE_PARALLEL_MANAGER_DISPATCH(m_apreal,
APReal)
703ARCANE_PARALLEL_MANAGER_DISPATCH(m_real2,
Real2)
704ARCANE_PARALLEL_MANAGER_DISPATCH(m_real3,
Real3)
705ARCANE_PARALLEL_MANAGER_DISPATCH(m_real2x2,
Real2x2)
706ARCANE_PARALLEL_MANAGER_DISPATCH(m_real3x3,
Real3x3)
707ARCANE_PARALLEL_MANAGER_DISPATCH(m_hpreal,
HPReal)
#define ARCANE_CHECK_POINTER(ptr)
Macro returning the pointer ptr if it is not null or throwing an exception if it is null.
#define ARCANE_THROW(exception_class,...)
Macro for throwing an exception with formatting.
Brief list of message exchange functions.
#define ARCCORE_DEFINE_REFERENCE_COUNTED_CLASS(class_name)
Macro to define methods and types for a class that uses a reference counter.
Emulation of real number in arbitrary precision.
Int64 largeSize() const
Number of elements in the vector (in 64 bits).
Modifiable view of an array of type T.
void fill(const T &o) noexcept
Fills the array with the value o.
constexpr Integer size() const noexcept
Returns the size of the array.
void resize(Int64 s)
Changes the number of elements in the array to s.
Constant view of an array of type T.
Class implementing a High-Precision real number.
Interface of the parallelism manager for a subdomain.
virtual Int32 commRank() const =0
Rank of this instance in the communicator.
virtual Ref< Parallel::IRequestList > createRequestListRef()=0
Creates a request list for this manager.
virtual ITimeMetricCollector * metricCollector()=0
Associated collection interface.
Interface of the dispatcher container.
Interface of the message passing manager.
Interface for a serialization message list.
Information about the source of a message.
Information for sending/receiving a point-to-point message.
Request sendSerializer(const ISerializer *s, const PointToPointMessageInfo &message) override
Sending message.
Request receiveSerializer(ISerializer *s, const PointToPointMessageInfo &message) override
Receiving message.
Ref< ISerializeMessageList > createSerializeMessageListRef() override
Create a list of serialization messages.
ISerializeMessageList * createSerializeMessageList() final
Creates a list to manage 'ISerializeMessage'.
void setTimeStats(ITimeStats *ts) override
Sets the statistics manager.
void allGather(ISerializer *send_serializer, ISerializer *recv_serializer) override
Redefines allGather here to avoid hiding the symbol in derived classes.
Ref< IParallelMng > createSubParallelMngRef(Int32ConstArrayView kept_ranks) override
Creates a new parallelism manager for a subset of ranks.
void processMessages(ConstArrayView< ISerializeMessage * > messages) override
Executes the operations of messages messages.
IMessagePassingMng * messagePassingMng() const override
Associated Arccore message passing manager.
ITimeMetricCollector * timeMetricCollector() const override
Arccore temporal statistics collector (can be null).
IParallelMng * createSubParallelMng(Int32ConstArrayView kept_ranks) final
Creates a new parallelism manager for a subset of ranks.
UniqueArray< Integer > testSomeRequests(ArrayView< Request > requests) override
Tests if one of the rvalues requests is complete.
ITimeStats * timeStats() const override
Associated statistics manager (can be null).
Ref< ISerializeMessageList > createSerializeMessageListRef() final
Creates a list to manage 'ISerializeMessage'.
UniqueArray< Integer > waitSomeRequests(ArrayView< Request > requests) override
Blocks while waiting for one of the rvalues requests to complete.
void broadcastMemoryBuffer(ByteArray &bytes, Int32 rank) override
Performs a broadcast of a memory region.
Class managing a 2-dimensional real vector.
Class managing a 2x2 matrix of reals.
Class managing a 3-dimensional real vector.
Class managing a 3x3 real matrix.
void reset()
Positions the instance to the null pointer.
Reference to an instance.
constexpr __host__ __device__ SizeType size() const noexcept
Returns the size of the array.
View of an array of elements of type T.
Unicode character string.
Span< const Byte > bytes() const
Returns the conversion of the instance into UTF-8 encoding.
Positions the phase of the currently executing action.
1D data vector with value semantics (STL style).
@ WaitSome
Wait until all messages in the list are processed.
void mpAllGather(IMessagePassingMng *pm, const ISerializer *send_serializer, ISerializer *receive_serialize)
allGather() message for serialization
-- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature --
ArrayView< Int64 > Int64ArrayView
C equivalent of a 1D array of 64-bit integers.
std::int64_t Int64
Signed integer type of 64 bits.
Int32 Integer
Type representing an integer.
Array< Byte > ByteArray
Dynamic one-dimensional array of characters.
ConstArrayView< Int32 > Int32ConstArrayView
C equivalent of a 1D array of 32-bit integers.
UniqueArray< Byte > ByteUniqueArray
Dynamic 1D array of characters.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Creates a reference on a pointer.
std::int32_t Int32
Signed integer type of 32 bits.
Ref< TrueType > createRef(Args &&... args)
Creates an instance of type TrueType with arguments Args and returns a reference to it.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Creates a reference on a pointer.