14#include "arcane/impl/ParallelExchanger.h"
16#include "arcane/utils/NotSupportedException.h"
17#include "arcane/utils/FatalErrorException.h"
18#include "arcane/utils/PlatformUtils.h"
21#include "arcane/core/IParallelMng.h"
22#include "arcane/core/SerializeBuffer.h"
23#include "arcane/core/Timer.h"
24#include "arcane/core/ISerializeMessageList.h"
25#include "arcane/core/internal/SerializeMessage.h"
48ParallelExchanger(Ref<IParallelMng> pm)
49: TraceAccessor(pm->traceMng())
51, m_timer(pm->timerMng(),
"ParallelExchangerTimer", Timer::TimerReal)
53 String use_collective_str = platform::getEnvironmentVariable(
"ARCANE_PARALLEL_EXCHANGER_USE_COLLECTIVE");
54 if (use_collective_str ==
"1" || use_collective_str ==
"TRUE")
55 m_exchange_mode = EM_Collective;
64 for (
auto* buf : m_comms_buf)
67 delete m_own_send_message;
68 delete m_own_recv_message;
74IParallelMng* ParallelExchanger::
77 return m_parallel_mng.get();
83bool ParallelExchanger::
84initializeCommunicationsMessages()
88 gather_input_send_ranks[0] = nb_send_rank;
90 std::begin(gather_input_send_ranks) + 1);
93 Integer nb_rank = m_parallel_mng->commSize();
94 m_parallel_mng->allGatherVariable(gather_input_send_ranks,
95 gather_output_send_ranks);
99 Int32 my_rank = m_parallel_mng->commRank();
102 for (
Integer i = 0; i < nb_rank; ++i) {
103 Integer nb_comm = gather_output_send_ranks[gather_index];
104 total_comm_rank += nb_comm;
106 for (
Integer z = 0; z < nb_comm; ++z) {
107 Integer current_rank = gather_output_send_ranks[gather_index + z];
108 if (current_rank == my_rank)
111 gather_index += nb_comm;
115 if (total_comm_rank == 0)
118 _initializeCommunicationsMessages();
126void ParallelExchanger::
131 _initializeCommunicationsMessages();
137void ParallelExchanger::
138_initializeCommunicationsMessages()
140 if (m_verbosity_level >= 1) {
141 info() <<
"ParallelExchanger " << m_name <<
" : nb_send=" << m_send_ranks.size()
142 <<
" nb_recv=" << m_recv_ranks.size();
143 if (m_verbosity_level >= 2) {
144 info() <<
"ParallelExchanger " << m_name <<
" : send=" << m_send_ranks;
145 info() <<
"ParallelExchanger " << m_name <<
" : recv=" << m_recv_ranks;
149 Int32 my_rank = m_parallel_mng->commRank();
151 for (Int32 msg_rank : m_send_ranks) {
152 auto* comm =
new SerializeMessage(my_rank, msg_rank, ISerializeMessage::MT_Send);
155 if (my_rank == msg_rank)
156 m_own_send_message = comm;
158 m_comms_buf.add(comm);
159 m_send_serialize_infos.add(comm);
166void ParallelExchanger::
177void ParallelExchanger::
181 info() <<
"ParallelExchanger " <<
m_name <<
": ProcessExchange (begin)"
186 _processExchange(options);
190 info() <<
"ParallelExchanger " <<
m_name <<
": ProcessExchange (end)"
191 <<
" total_time=" <<
m_timer.lastActivationTime();
197void ParallelExchanger::
200 if (m_verbosity_level >= 1) {
201 Int64 total_size = 0;
203 Int64 message_size = comm->trueSerializer()->totalSize();
204 total_size += message_size;
205 if (m_verbosity_level >= 2)
206 info() <<
"Send rank=" << comm->destination() <<
" size=" << message_size;
208 info() <<
"ParallelExchanger " << m_name <<
": ProcessExchange"
209 <<
" total_size=" << total_size <<
" nb_message=" << m_comms_buf.size();
212 bool use_all_to_all =
false;
214 use_all_to_all =
true;
219 Int32 my_rank = m_parallel_mng->commRank();
220 for (Int32 msg_rank : m_recv_ranks) {
221 auto* comm =
new SerializeMessage(my_rank, msg_rank, ISerializeMessage::MT_Recv);
224 if (my_rank == msg_rank)
225 m_own_recv_message = comm;
227 m_comms_buf.add(comm);
228 m_recv_serialize_infos.add(comm);
232 _processExchangeCollective();
236 _processExchangeWithControl(max_pending);
238 m_parallel_mng->processMessages(m_comms_buf);
240 if (m_own_send_message && m_own_recv_message) {
241 m_own_recv_message->serializer()->copy(m_own_send_message->serializer());
246 for (SerializeMessage* comm : m_recv_serialize_infos)
247 comm->serializer()->setMode(ISerializer::ModeGet);
253void ParallelExchanger::
254_processExchangeCollective()
256 info() <<
"Using collective exchange in ParallelExchanger";
258 IParallelMng* pm = m_parallel_mng.get();
259 Int32 nb_rank = pm->commSize();
267 for (SerializeMessage* comm : m_send_serialize_infos) {
268 auto* sbuf = comm->trueSerializer();
269 Span<Byte> val_buf = sbuf->globalBuffer();
270 Int32 rank = comm->destRank();
276 Timer::SimplePrinter sp(traceMng(),
"ParallelExchanger: sending sizes with AllToAll");
277 pm->allToAll(send_counts, recv_counts, 1);
284 Int32 total_send = 0;
285 Int32 total_recv = 0;
286 Int64 int64_total_send = 0;
287 Int64 int64_total_recv = 0;
288 for (Integer i = 0; i < nb_rank; ++i) {
289 send_indexes[i] = total_send;
290 recv_indexes[i] = total_recv;
291 total_send += send_counts[i];
292 total_recv += recv_counts[i];
293 int64_total_send += send_counts[i];
294 int64_total_recv += recv_counts[i];
298 if (int64_total_send != total_send)
299 ARCANE_FATAL(
"Message to send is too big size={0} max=2^31", int64_total_send);
300 if (int64_total_recv != total_recv)
301 ARCANE_FATAL(
"Message to receive is too big size={0} max=2^31", int64_total_recv);
305 bool is_verbose = (m_verbosity_level >= 1);
306 if (m_verbosity_level >= 2) {
307 for (Integer i = 0; i < nb_rank; ++i) {
308 if (send_counts[i] != 0 || recv_counts[i] != 0)
309 info() <<
"INFOS: rank=" << i <<
" send_count=" << send_counts[i]
310 <<
" send_idx=" << send_indexes[i]
311 <<
" recv_count=" << recv_counts[i]
312 <<
" recv_idx=" << recv_indexes[i];
317 for (SerializeMessage* comm : m_send_serialize_infos) {
318 auto* sbuf = comm->trueSerializer();
319 Span<Byte> val_buf = sbuf->globalBuffer();
320 Int32 rank = comm->destRank();
322 info() <<
"SEND rank=" << rank <<
" size=" << send_counts[rank]
323 <<
" idx=" << send_indexes[rank]
324 <<
" buf_size=" << val_buf.size();
325 ByteArrayView dest_buf(send_counts[rank], &send_buf[send_indexes[rank]]);
326 dest_buf.copy(val_buf);
330 info() <<
"AllToAllVariable total_send=" << total_send
331 <<
" total_recv=" << total_recv;
334 Timer::SimplePrinter sp(traceMng(),
"ParallelExchanger: sending values with AllToAll");
335 pm->allToAllVariable(send_buf, send_counts, send_indexes, recv_buf, recv_counts, recv_indexes);
338 for (SerializeMessage* comm : m_recv_serialize_infos) {
339 auto* sbuf = comm->trueSerializer();
340 Int32 rank = comm->destRank();
342 info() <<
"RECV rank=" << rank <<
" size=" << recv_counts[rank]
343 <<
" idx=" << recv_indexes[rank];
344 ByteArrayView orig_buf(recv_counts[rank], &recv_buf[recv_indexes[rank]]);
346 sbuf->preallocate(orig_buf.size());
347 sbuf->globalBuffer().copy(orig_buf);
348 sbuf->setFromSizes();
373void ParallelExchanger::
374setVerbosityLevel(
Int32 v)
384void ParallelExchanger::
411 const int nb_phase = 4;
412 int phase1 = a->destination().value() % nb_phase;
414 if (phase1 != phase2)
415 return phase1 < phase2;
418 if (a->isSend() != b->
isSend())
419 return (a->isSend() ?
false :
true);
420 return a->source() < b->
source();
431void ParallelExchanger::
432_processExchangeWithControl(
Int32 max_pending_message)
437 auto message_list{ m_parallel_mng->createSerializeMessageListRef() };
440 std::sort(sorted_messages.
begin(), sorted_messages.
end(), SortFunctor{});
446 max_pending_message =
math::max(4, max_pending_message);
449 Integer nb_to_add = max_pending_message;
453 if (verbosity_level >= 1)
454 info() <<
"ParallelExchanger " <<
m_name <<
" : process exchange WITH CONTROL"
455 <<
" nb_message=" << nb_message <<
" max_pending=" << max_pending_message;
457 while (position < nb_message) {
458 for (
Integer i = 0; i < nb_to_add; ++i) {
459 if (position >= nb_message)
462 if (verbosity_level >= 2)
463 info() <<
"Add Message p=" << position <<
" is_send?=" << message->
isSend() <<
" source=" << message->
source()
465 message_list->addMessage(message);
470 if (position >= nb_message) {
471 message_list->waitMessages(Parallel::WaitAll);
477 if (verbosity_level >= 2)
478 info() <<
"Wait nb_done=" << nb_done;
480 nb_done = max_pending_message;
#define ARCANE_FATAL(...)
Macro throwing a FatalErrorException.
Various mathematical functions.
Integer size() const
Number of elements in the vector.
iterator begin()
Iterator over the first element of the array.
iterator end()
Iterator over the first element after the end of the array.
constexpr Integer size() const noexcept
Number of elements in the array.
Interface of the parallelism manager for a subdomain.
Interface for a serialization message between IMessagePassingMng.
virtual bool isSend() const =0
true if it should send, false if it should receive
virtual MessageRank destination() const =0
Destination rank (if isSend() is true) or sender.
virtual MessageRank source() const =0
Message sender rank.
Int32 value() const
Rank value.
Options for IParallelMng::processExchange().
Int32 maxPendingMessage() const
Maximum number of pending messages.
void setExchangeMode(eExchangeMode mode)
Sets the exchange mode.
eExchangeMode
Exchange mode.
eExchangeMode exchangeMode() const
Specified exchange mode.
Information exchange between processors.
Int32UniqueArray m_send_ranks
List of subdomains to send.
String m_name
Instance name used for display.
UniqueArray< SerializeMessage * > m_recv_serialize_infos
List of messages to receive.
void processExchange() override
Performs the exchange using the default options of ParallelExchangerOptions.
Int32 m_verbosity_level
Verbosity level.
Timer m_timer
Timer to measure time spent in exchanges.
String name() const override
Instance name.
UniqueArray< SerializeMessage * > m_send_serialize_infos
List of messages to receive.
Int32UniqueArray m_recv_ranks
List of subdomains to receive.
eExchangeMode m_exchange_mode
Exchange mode.
UniqueArray< ISerializeMessage * > m_comms_buf
List of messages to send and receive.
Reference to an instance.
Message using a SerializeBuffer.
Unicode character string.
Sentinel for the timer. The sentinel associated with a timer allows it to be triggered upon its const...
TraceMessage info() const
Flow for an information message.
1D data vector with value semantics (STL style).
T max(const T &a, const T &b, const T &c)
Returns the maximum of three elements.
@ WaitSome
Wait until all messages in the list are processed.
-- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature --
Integer arcaneCheckArraySize(unsigned long long size)
Checks that size can be converted into an 'Integer' to serve as the size of an array....
ArrayView< Byte > ByteArrayView
C equivalent of a 1D array of characters.
std::int64_t Int64
Signed integer type of 64 bits.
Int32 Integer
Type representing an integer.
ConstArrayView< Int32 > Int32ConstArrayView
C equivalent of a 1D array of 32-bit integers.
UniqueArray< Byte > ByteUniqueArray
Dynamic 1D array of characters.
UniqueArray< Int32 > Int32UniqueArray
Dynamic 1D array of 32-bit integers.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Creates a reference on a pointer.
UniqueArray< Integer > IntegerUniqueArray
Dynamic 1D array of integers.
std::int32_t Int32
Signed integer type of 32 bits.