14#include "arcane/utils/Array.h"
15#include "arcane/utils/HPReal.h"
16#include "arcane/utils/NumericTypes.h"
17#include "arcane/utils/NotImplementedException.h"
18#include "arcane/utils/ScopedPtr.h"
19#include "arcane/utils/ITraceMng.h"
20#include "arcane/utils/ValueConvert.h"
21#include "arcane/utils/FatalErrorException.h"
23#include "arcane/core/ParallelMngDispatcher.h"
24#include "arcane/core/IParallelDispatch.h"
25#include "arcane/core/Timer.h"
26#include "arcane/core/ITimeStats.h"
27#include "arcane/core/IParallelNonBlockingCollective.h"
28#include "arcane/core/internal/ParallelMngInternal.h"
30#include "arcane/accelerator/core/Runner.h"
31#include "arcane/accelerator/core/RunQueueBuildInfo.h"
33#include "arccore/message_passing/Dispatchers.h"
35#include "arccore/message_passing/MessagePassingMng.h"
36#include "arccore/message_passing/IControlDispatcher.h"
37#include "arccore/message_passing/ISerializeDispatcher.h"
38#include "arccore/message_passing/PointToPointMessageInfo.h"
39#include "arccore/message_passing/IRequestList.h"
40#include "arccore/message_passing/ISerializeMessageList.h"
41#include "arccore/trace/internal/TimeMetric.h"
48using namespace Arccore::MessagePassing;
53ParallelMngDispatcherBuildInfo::
56: m_comm_rank(mpm->commRank())
57, m_comm_size(mpm->commSize())
58, m_dispatchers(dispatchers)
59, m_message_passing_mng(mpm)
67ParallelMngDispatcherBuildInfo::
70: m_comm_rank(mpm_ref->commRank())
71, m_comm_size(mpm_ref->commSize())
72, m_dispatchers(dispatchers.get())
73, m_dispatchers_ref(dispatchers)
74, m_message_passing_mng(mpm_ref.get())
75, m_message_passing_mng_ref(mpm_ref)
83ParallelMngDispatcherBuildInfo::
84ParallelMngDispatcherBuildInfo(Int32 comm_rank, Int32 comm_size)
85: m_comm_rank(comm_rank)
86, m_comm_size(comm_size)
87, m_dispatchers(nullptr)
88, m_message_passing_mng(nullptr)
96void ParallelMngDispatcherBuildInfo::
101 m_dispatchers = m_dispatchers_ref.get();
103 if (!m_message_passing_mng) {
104 auto* x =
new MP::MessagePassingMng(m_comm_rank, m_comm_size, m_dispatchers);
105 m_message_passing_mng = x;
106 m_message_passing_mng_ref =
makeRef(x);
108 if (!m_message_passing_mng_ref.get())
109 m_message_passing_mng_ref =
makeRef(m_message_passing_mng);
115ParallelMngDispatcher::DefaultControlDispatcher::
116DefaultControlDispatcher(IParallelMng* pm)
121void ParallelMngDispatcher::DefaultControlDispatcher::
124 m_parallel_mng->waitAllRequests(requests);
127void ParallelMngDispatcher::DefaultControlDispatcher::
129 bool is_non_blocking)
133 done_requests = m_parallel_mng->testSomeRequests(requests);
135 done_requests = m_parallel_mng->waitSomeRequests(requests);
137 for (
int x : done_requests)
148void ParallelMngDispatcher::DefaultControlDispatcher::
151 m_parallel_mng->barrier();
154Request ParallelMngDispatcher::DefaultControlDispatcher::
157 return m_parallel_mng->nonBlockingCollective()->barrier();
160MessageId ParallelMngDispatcher::DefaultControlDispatcher::
163 return m_parallel_mng->probe(message);
169 return m_parallel_mng->legacyProbe(message);
173createRequestListRef()
175 return m_parallel_mng->createRequestListRef();
181void ParallelMngDispatcher::DefaultControlDispatcher::
182setProfiler(MP::IProfiler* p)
192:
public MP::ISerializeDispatcher
204 return m_parallel_mng->createSerializeMessageListRef();
208 return m_parallel_mng->sendSerializer(s, message);
212 return m_parallel_mng->receiveSerializer(s, message);
226ParallelMngDispatcher::
229, m_unsigned_char(nullptr)
230, m_signed_char(nullptr)
232, m_unsigned_short(nullptr)
234, m_unsigned_int(nullptr)
236, m_unsigned_long(nullptr)
237, m_long_long(nullptr)
238, m_unsigned_long_long(nullptr)
241, m_long_double(nullptr)
248, m_time_stats(nullptr)
249, m_mp_dispatchers_ref(bi.dispatchersRef())
250, m_message_passing_mng_ref(bi.messagePassingMngRef())
251, m_control_dispatcher(new DefaultControlDispatcher(this))
252, m_serialize_dispatcher(new SerializeDispatcher(this))
260ParallelMngDispatcher::
261~ParallelMngDispatcher()
263 m_mp_dispatchers_ref.
reset();
265 delete m_parallel_mng_internal;
267 delete m_serialize_dispatcher;
268 delete m_control_dispatcher;
270 delete m_signed_char;
271 delete m_unsigned_char;
273 delete m_unsigned_short;
275 delete m_unsigned_int;
277 delete m_unsigned_long;
279 delete m_unsigned_long_long;
283 delete m_long_double;
294void ParallelMngDispatcher::
295_setControlDispatcher(MP::IControlDispatcher* d)
297 delete m_control_dispatcher;
298 m_control_dispatcher = d;
304void ParallelMngDispatcher::
305_setSerializeDispatcher(MP::ISerializeDispatcher* d)
307 delete m_serialize_dispatcher;
308 m_serialize_dispatcher = d;
314void ParallelMngDispatcher::
315_setArccoreDispatchers()
317 m_mp_dispatchers_ref->setDispatcher(m_char->toArccoreDispatcher());
318 m_mp_dispatchers_ref->setDispatcher(m_signed_char->toArccoreDispatcher());
319 m_mp_dispatchers_ref->setDispatcher(m_unsigned_char->toArccoreDispatcher());
320 m_mp_dispatchers_ref->setDispatcher(m_short->toArccoreDispatcher());
321 m_mp_dispatchers_ref->setDispatcher(m_unsigned_short->toArccoreDispatcher());
322 m_mp_dispatchers_ref->setDispatcher(m_int->toArccoreDispatcher());
323 m_mp_dispatchers_ref->setDispatcher(m_unsigned_int->toArccoreDispatcher());
324 m_mp_dispatchers_ref->setDispatcher(m_long->toArccoreDispatcher());
325 m_mp_dispatchers_ref->setDispatcher(m_unsigned_long->toArccoreDispatcher());
326 m_mp_dispatchers_ref->setDispatcher(m_long_long->toArccoreDispatcher());
327 m_mp_dispatchers_ref->setDispatcher(m_unsigned_long_long->toArccoreDispatcher());
328 m_mp_dispatchers_ref->setDispatcher(m_float->toArccoreDispatcher());
329 m_mp_dispatchers_ref->setDispatcher(m_double->toArccoreDispatcher());
330 m_mp_dispatchers_ref->setDispatcher(m_long_double->toArccoreDispatcher());
333 m_mp_dispatchers_ref->setDispatcher(m_control_dispatcher);
335 m_mp_dispatchers_ref->setDispatcher(m_serialize_dispatcher);
344 return m_message_passing_mng_ref.get();
369 _messagePassingMng()->setTimeMetricCollector(c);
377_communicationTimeMetricAction()
const
379 return Timer::phaseAction(
timeStats(), TP_Communication);
385void ParallelMngDispatcher::
386broadcastString(
String& str, Int32 rank)
391 if (rank == my_rank) {
392 len_info[0] = bytes.size();
395 broadcast(utf8_array, rank);
400 broadcast(utf8_array, rank);
401 str = String::fromUtf8(utf8_array);
413 size = bytes.largeSize();
422 broadcast(bytes, rank);
432 mpAllGather(_messagePassingMng(), send_serializer, recv_serializer);
445 message_list->addMessage(m);
447 message_list->waitMessages(Parallel::WaitAll);
460 message_list->addMessage(v.get());
462 message_list->waitMessages(Parallel::WaitAll);
471 return makeRef(_createSerializeMessageList());
480 return _createSerializeMessageList();
489 return _createSubParallelMng(kept_ranks);
498 return makeRef(_createSubParallelMng(kept_ranks));
505_createSubParallelMngRef([[maybe_unused]]
Int32 color, [[maybe_unused]]
Int32 key)
516 Timer::Phase tphase(
timeStats(), TP_Communication);
519 request_list->add(requests);
520 request_list->wait(wait_type);
522 for (
Integer i = 0; i < nb_request; ++i)
523 requests[i] = request_list->request(i);
524 return request_list->doneRequestIndexes();
548#define ARCANE_PARALLEL_MANAGER_DISPATCH(field, type) \
549 void ParallelMngDispatcher:: \
550 allGather(ConstArrayView<type> send_buf, ArrayView<type> recv_buf) \
552 Timer::Phase tphase(timeStats(), TP_Communication); \
553 (field)->allGather(send_buf, recv_buf); \
555 void ParallelMngDispatcher:: \
556 gather(ConstArrayView<type> send_buf, ArrayView<type> recv_buf, Integer rank) \
558 Timer::Phase tphase(timeStats(), TP_Communication); \
559 (field)->gather(send_buf, recv_buf, rank); \
561 void ParallelMngDispatcher:: \
562 allGatherVariable(ConstArrayView<type> send_buf, Array<type>& recv_buf) \
564 Timer::Phase tphase(timeStats(), TP_Communication); \
565 (field)->allGatherVariable(send_buf, recv_buf); \
567 void ParallelMngDispatcher:: \
568 gatherVariable(ConstArrayView<type> send_buf, Array<type>& recv_buf, Integer rank) \
570 Timer::Phase tphase(timeStats(), TP_Communication); \
571 (field)->gatherVariable(send_buf, recv_buf, rank); \
573 void ParallelMngDispatcher:: \
574 scatterVariable(ConstArrayView<type> send_buf, ArrayView<type> recv_buf, Integer root) \
576 Timer::Phase tphase(timeStats(), TP_Communication); \
577 (field)->scatterVariable(send_buf, recv_buf, root); \
579 type ParallelMngDispatcher:: \
580 reduce(eReduceType rt, type v) \
582 Timer::Phase tphase(timeStats(), TP_Communication); \
583 return (field)->allReduce(rt, v); \
585 void ParallelMngDispatcher:: \
586 reduce(eReduceType rt, ArrayView<type> v) \
588 Timer::Phase tphase(timeStats(), TP_Communication); \
589 (field)->allReduce(rt, v); \
591 void ParallelMngDispatcher:: \
592 broadcast(ArrayView<type> send_buf, Integer id) \
594 Timer::Phase tphase(timeStats(), TP_Communication); \
595 (field)->broadcast(send_buf, id); \
597 void ParallelMngDispatcher:: \
598 send(ConstArrayView<type> values, Integer id) \
600 Timer::Phase tphase(timeStats(), TP_Communication); \
601 (field)->send(values, id); \
603 void ParallelMngDispatcher:: \
604 recv(ArrayView<type> values, Integer id) \
606 Timer::Phase tphase(timeStats(), TP_Communication); \
607 (field)->recv(values, id); \
609 Parallel::Request ParallelMngDispatcher:: \
610 send(ConstArrayView<type> values, Integer id, bool is_blocked) \
612 Timer::Phase tphase(timeStats(), TP_Communication); \
613 return (field)->send(values, id, is_blocked); \
615 Request ParallelMngDispatcher:: \
616 send(Span<const type> values, const PointToPointMessageInfo& message) \
618 Timer::Phase tphase(timeStats(), TP_Communication); \
619 return (field)->send(values, message); \
621 Parallel::Request ParallelMngDispatcher:: \
622 recv(ArrayView<type> values, Integer id, bool is_blocked) \
624 Timer::Phase tphase(timeStats(), TP_Communication); \
625 return (field)->recv(values, id, is_blocked); \
627 Request ParallelMngDispatcher:: \
628 receive(Span<type> values, const PointToPointMessageInfo& message) \
630 Timer::Phase tphase(timeStats(), TP_Communication); \
631 return (field)->receive(values, message); \
633 void ParallelMngDispatcher:: \
634 sendRecv(ConstArrayView<type> send_buf, ArrayView<type> recv_buf, Integer id) \
636 Timer::Phase tphase(timeStats(), TP_Communication); \
637 (field)->sendRecv(send_buf, recv_buf, id); \
639 void ParallelMngDispatcher:: \
640 allToAll(ConstArrayView<type> send_buf, ArrayView<type> recv_buf, Integer count) \
642 Timer::Phase tphase(timeStats(), TP_Communication); \
643 (field)->allToAll(send_buf, recv_buf, count); \
645 void ParallelMngDispatcher:: \
646 allToAllVariable(ConstArrayView<type> send_buf, Int32ConstArrayView send_count, \
647 Int32ConstArrayView send_index, ArrayView<type> recv_buf, \
648 Int32ConstArrayView recv_count, Int32ConstArrayView recv_index) \
650 Timer::Phase tphase(timeStats(), TP_Communication); \
651 (field)->allToAllVariable(send_buf, send_count, send_index, recv_buf, recv_count, recv_index); \
653 type ParallelMngDispatcher:: \
654 scan(eReduceType rt, type v) \
656 Timer::Phase tphase(timeStats(), TP_Communication); \
657 return (field)->scan(rt, v); \
659 void ParallelMngDispatcher:: \
660 computeMinMaxSum(type val, type& min_val, type& max_val, type& sum_val, Int32& min_proc, Int32& max_proc) \
662 Timer::Phase tphase(timeStats(), TP_Communication); \
663 (field)->computeMinMaxSum(val, min_val, max_val, sum_val, min_proc, max_proc); \
665 IParallelDispatchT<type>* ParallelMngDispatcher:: \
670 void ParallelMngDispatcher:: \
671 computeMinMaxSum(ConstArrayView<type> values, \
672 ArrayView<type> min_values, \
673 ArrayView<type> max_values, \
674 ArrayView<type> sum_values, \
675 ArrayView<Int32> min_ranks, \
676 ArrayView<Int32> max_ranks) \
678 Timer::Phase tphase(timeStats(), TP_Communication); \
679 (field)->computeMinMaxSum(values, min_values, max_values, sum_values, min_ranks, max_ranks); \
681 void ParallelMngDispatcher:: \
682 scan(eReduceType rt, ArrayView<type> v) \
684 Timer::Phase tphase(timeStats(), TP_Communication); \
685 (field)->scan(rt, v); \
688ARCANE_PARALLEL_MANAGER_DISPATCH(m_char,
char)
689ARCANE_PARALLEL_MANAGER_DISPATCH(m_unsigned_char,
unsigned char)
690ARCANE_PARALLEL_MANAGER_DISPATCH(m_signed_char,
signed char)
691ARCANE_PARALLEL_MANAGER_DISPATCH(m_short,
short)
692ARCANE_PARALLEL_MANAGER_DISPATCH(m_unsigned_short,
unsigned short)
693ARCANE_PARALLEL_MANAGER_DISPATCH(m_int,
int)
694ARCANE_PARALLEL_MANAGER_DISPATCH(m_unsigned_int,
unsigned int)
695ARCANE_PARALLEL_MANAGER_DISPATCH(m_long,
long)
696ARCANE_PARALLEL_MANAGER_DISPATCH(m_unsigned_long,
unsigned long)
697ARCANE_PARALLEL_MANAGER_DISPATCH(m_long_long,
long long)
698ARCANE_PARALLEL_MANAGER_DISPATCH(m_unsigned_long_long,
unsigned long long)
699ARCANE_PARALLEL_MANAGER_DISPATCH(m_float,
float)
700ARCANE_PARALLEL_MANAGER_DISPATCH(m_double,
double)
701ARCANE_PARALLEL_MANAGER_DISPATCH(m_long_double,
long double)
702ARCANE_PARALLEL_MANAGER_DISPATCH(m_apreal,
APReal)
703ARCANE_PARALLEL_MANAGER_DISPATCH(m_real2,
Real2)
704ARCANE_PARALLEL_MANAGER_DISPATCH(m_real3,
Real3)
705ARCANE_PARALLEL_MANAGER_DISPATCH(m_real2x2,
Real2x2)
706ARCANE_PARALLEL_MANAGER_DISPATCH(m_real3x3,
Real3x3)
707ARCANE_PARALLEL_MANAGER_DISPATCH(m_hpreal,
HPReal)
#define ARCANE_CHECK_POINTER(ptr)
Macro returning the pointer ptr if it is not null or throwing an exception if it is null.
#define ARCANE_THROW(exception_class,...)
Macro for throwing an exception with formatting.
Brief list of message exchange functions.
#define ARCCORE_DEFINE_REFERENCE_COUNTED_CLASS(class_name)
Macro to define methods and types for a class that uses a reference counter.
Emulation of real number in arbitrary precision.
Modifiable view of an array of type T.
void fill(const T &o) noexcept
Fills the array with the value o.
constexpr Integer size() const noexcept
Returns the size of the array.
Constant view of an array of type T.
Class implementing a High-Precision real number.
Interface for an 'IParallelMng' container.
Interface of the parallelism manager for a subdomain.
virtual Int32 commRank() const =0
Rank of this instance in the communicator.
virtual Ref< Parallel::IRequestList > createRequestListRef()=0
Creates a request list for this manager.
Interface managing statistics on execution.
Interface managing execution time statistics.
virtual ITimeMetricCollector * metricCollector()=0
Associated collection interface.
Interface of the dispatcher container.
Interface of the message passing manager.
Interface for a serialization message list.
Interface for a serialization message between IMessagePassingMng.
Information about the source of a message.
Information for sending/receiving a point-to-point message.
Exception when a function is not implemented.
Request sendSerializer(const ISerializer *s, const PointToPointMessageInfo &message) override
Sending message.
Request receiveSerializer(ISerializer *s, const PointToPointMessageInfo &message) override
Receiving message.
Ref< ISerializeMessageList > createSerializeMessageListRef() override
Create a list of serialization messages.
ISerializeMessageList * createSerializeMessageList() final
Creates a list to manage 'ISerializeMessage'.
void setTimeStats(ITimeStats *ts) override
Sets the statistics manager.
void allGather(ISerializer *send_serializer, ISerializer *recv_serializer) override
Redefines allGather here to avoid hiding the symbol in derived classes.
Ref< IParallelMng > createSubParallelMngRef(Int32ConstArrayView kept_ranks) override
Creates a new parallelism manager for a subset of ranks.
void processMessages(ConstArrayView< ISerializeMessage * > messages) override
Executes the operations of messages messages.
IMessagePassingMng * messagePassingMng() const override
Associated Arccore message passing manager.
ITimeMetricCollector * timeMetricCollector() const override
Arccore temporal statistics collector (can be null).
IParallelMng * createSubParallelMng(Int32ConstArrayView kept_ranks) final
Creates a new parallelism manager for a subset of ranks.
UniqueArray< Integer > testSomeRequests(ArrayView< Request > requests) override
Tests if one of the rvalues requests is complete.
ITimeStats * timeStats() const override
Associated statistics manager (can be null).
Ref< ISerializeMessageList > createSerializeMessageListRef() final
Creates a list to manage 'ISerializeMessage'.
UniqueArray< Integer > waitSomeRequests(ArrayView< Request > requests) override
Blocks while waiting for one of the rvalues requests to complete.
void broadcastMemoryBuffer(ByteArray &bytes, Int32 rank) override
Performs a broadcast of a memory region.
Internal part of IParallelMng.
Class managing a 2-dimensional real vector.
Class managing a 2x2 matrix of reals.
Class managing a 3-dimensional real vector.
Class managing a 3x3 real matrix.
void reset()
Positions the instance to the null pointer.
Reference to an instance.
View of an array of elements of type T.
Unicode character string.
Span< const Byte > bytes() const
Returns the conversion of the instance into UTF-8 encoding.
Sentinel for collecting temporal information.
Positions the phase of the currently executing action.
1D data vector with value semantics (STL style).
@ WaitSome
Wait until all messages in the list are processed.
void mpAllGather(IMessagePassingMng *pm, const ISerializer *send_serializer, ISerializer *receive_serialize)
allGather() message for serialization
-- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature --
ArrayView< Int64 > Int64ArrayView
C equivalent of a 1D array of 64-bit integers.
std::int64_t Int64
Signed integer type of 64 bits.
Int32 Integer
Type representing an integer.
Array< Byte > ByteArray
Dynamic one-dimensional array of characters.
ConstArrayView< Int32 > Int32ConstArrayView
C equivalent of a 1D array of 32-bit integers.
UniqueArray< Byte > ByteUniqueArray
Dynamic 1D array of characters.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Creates a reference on a pointer.
std::int32_t Int32
Signed integer type of 32 bits.
Ref< TrueType > createRef(Args &&... args)
Creates an instance of type TrueType with arguments Args and returns a reference to it.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Creates a reference on a pointer.