Arcane  4.1.12.0
Developer documentation
Loading...
Searching...
No Matches
ParallelExchanger.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2026 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* ParallelExchanger.cc (C) 2000-2025 */
9/* */
10/* Information exchange between processors. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/impl/ParallelExchanger.h"
15
16#include "arcane/utils/NotSupportedException.h"
17#include "arcane/utils/FatalErrorException.h"
18#include "arcane/utils/PlatformUtils.h"
19
21#include "arcane/core/IParallelMng.h"
22#include "arcane/core/SerializeBuffer.h"
23#include "arcane/core/Timer.h"
24#include "arcane/core/ISerializeMessageList.h"
25#include "arcane/core/internal/SerializeMessage.h"
26
27#include <algorithm>
28
29/*---------------------------------------------------------------------------*/
30/*---------------------------------------------------------------------------*/
31
32namespace Arcane
33{
34
35/*---------------------------------------------------------------------------*/
36/*---------------------------------------------------------------------------*/
37
38ParallelExchanger::
39ParallelExchanger(IParallelMng* pm)
41{
42}
43
44/*---------------------------------------------------------------------------*/
45/*---------------------------------------------------------------------------*/
46
47ParallelExchanger::
48ParallelExchanger(Ref<IParallelMng> pm)
49: TraceAccessor(pm->traceMng())
50, m_parallel_mng(pm)
51, m_timer(pm->timerMng(), "ParallelExchangerTimer", Timer::TimerReal)
52{
53 String use_collective_str = platform::getEnvironmentVariable("ARCANE_PARALLEL_EXCHANGER_USE_COLLECTIVE");
54 if (use_collective_str == "1" || use_collective_str == "TRUE")
55 m_exchange_mode = EM_Collective;
56}
57
58/*---------------------------------------------------------------------------*/
59/*---------------------------------------------------------------------------*/
60
61ParallelExchanger::
62~ParallelExchanger()
63{
64 for (auto* buf : m_comms_buf)
65 delete buf;
66 m_comms_buf.clear();
67 delete m_own_send_message;
68 delete m_own_recv_message;
69}
70
71/*---------------------------------------------------------------------------*/
72/*---------------------------------------------------------------------------*/
73
74IParallelMng* ParallelExchanger::
75parallelMng() const
76{
77 return m_parallel_mng.get();
78}
79
80/*---------------------------------------------------------------------------*/
81/*---------------------------------------------------------------------------*/
82
83bool ParallelExchanger::
84initializeCommunicationsMessages()
85{
86 Int32 nb_send_rank = m_send_ranks.size();
87 UniqueArray<Int32> gather_input_send_ranks(nb_send_rank + 1);
88 gather_input_send_ranks[0] = nb_send_rank;
89 std::copy(std::begin(m_send_ranks), std::end(m_send_ranks),
90 std::begin(gather_input_send_ranks) + 1);
91
92 IntegerUniqueArray gather_output_send_ranks;
93 Integer nb_rank = m_parallel_mng->commSize();
94 m_parallel_mng->allGatherVariable(gather_input_send_ranks,
95 gather_output_send_ranks);
96
97 m_recv_ranks.clear();
98 Integer total_comm_rank = 0;
99 Int32 my_rank = m_parallel_mng->commRank();
100 {
101 Integer gather_index = 0;
102 for (Integer i = 0; i < nb_rank; ++i) {
103 Integer nb_comm = gather_output_send_ranks[gather_index];
104 total_comm_rank += nb_comm;
105 ++gather_index;
106 for (Integer z = 0; z < nb_comm; ++z) {
107 Integer current_rank = gather_output_send_ranks[gather_index + z];
108 if (current_rank == my_rank)
109 m_recv_ranks.add(i);
110 }
111 gather_index += nb_comm;
112 }
113 }
114
115 if (total_comm_rank == 0)
116 return true;
117
118 _initializeCommunicationsMessages();
119
120 return false;
121}
122
123/*---------------------------------------------------------------------------*/
124/*---------------------------------------------------------------------------*/
125
126void ParallelExchanger::
127initializeCommunicationsMessages(Int32ConstArrayView recv_ranks)
128{
129 m_recv_ranks.resize(recv_ranks.size());
130 m_recv_ranks.copy(recv_ranks);
131 _initializeCommunicationsMessages();
132}
133
134/*---------------------------------------------------------------------------*/
135/*---------------------------------------------------------------------------*/
136
137void ParallelExchanger::
138_initializeCommunicationsMessages()
139{
140 if (m_verbosity_level >= 1) {
141 info() << "ParallelExchanger " << m_name << " : nb_send=" << m_send_ranks.size()
142 << " nb_recv=" << m_recv_ranks.size();
143 if (m_verbosity_level >= 2) {
144 info() << "ParallelExchanger " << m_name << " : send=" << m_send_ranks;
145 info() << "ParallelExchanger " << m_name << " : recv=" << m_recv_ranks;
146 }
147 }
148
149 Int32 my_rank = m_parallel_mng->commRank();
150
151 for (Int32 msg_rank : m_send_ranks) {
152 auto* comm = new SerializeMessage(my_rank, msg_rank, ISerializeMessage::MT_Send);
153 // It is useless to send messages to ourselves.
154 // (Plus, it crashes certain versions of MPI...)
155 if (my_rank == msg_rank)
156 m_own_send_message = comm;
157 else
158 m_comms_buf.add(comm);
159 m_send_serialize_infos.add(comm);
160 }
161}
162
163/*---------------------------------------------------------------------------*/
164/*---------------------------------------------------------------------------*/
165
166void ParallelExchanger::
167processExchange()
168{
171 processExchange(options);
172}
173
174/*---------------------------------------------------------------------------*/
175/*---------------------------------------------------------------------------*/
176
177void ParallelExchanger::
178processExchange(const ParallelExchangerOptions& options)
179{
180 if (m_verbosity_level >= 1)
181 info() << "ParallelExchanger " << m_name << ": ProcessExchange (begin)"
182 << " date=" << platform::getCurrentDateTime();
183
184 {
185 Timer::Sentry sentry(&m_timer);
186 _processExchange(options);
187 }
188
189 if (m_verbosity_level >= 1)
190 info() << "ParallelExchanger " << m_name << ": ProcessExchange (end)"
191 << " total_time=" << m_timer.lastActivationTime();
192}
193
194/*---------------------------------------------------------------------------*/
195/*---------------------------------------------------------------------------*/
196
197void ParallelExchanger::
198_processExchange(const ParallelExchangerOptions& options)
199{
200 if (m_verbosity_level >= 1) {
201 Int64 total_size = 0;
202 for (SerializeMessage* comm : m_send_serialize_infos) {
203 Int64 message_size = comm->trueSerializer()->totalSize();
204 total_size += message_size;
205 if (m_verbosity_level >= 2)
206 info() << "Send rank=" << comm->destination() << " size=" << message_size;
207 }
208 info() << "ParallelExchanger " << m_name << ": ProcessExchange"
209 << " total_size=" << total_size << " nb_message=" << m_comms_buf.size();
210 }
211
212 bool use_all_to_all = false;
213 if (options.exchangeMode())
214 use_all_to_all = true;
215 // TODO: handle EM_Auto case
216
217 // Generates the infos for each processor from which we will receive
218 // entities
219 Int32 my_rank = m_parallel_mng->commRank();
220 for (Int32 msg_rank : m_recv_ranks) {
221 auto* comm = new SerializeMessage(my_rank, msg_rank, ISerializeMessage::MT_Recv);
222 // It is useless to send messages to ourselves.
223 // (Plus, it crashes certain versions of MPI...)
224 if (my_rank == msg_rank)
225 m_own_recv_message = comm;
226 else
227 m_comms_buf.add(comm);
228 m_recv_serialize_infos.add(comm);
229 }
230
231 if (use_all_to_all)
232 _processExchangeCollective();
233 else {
234 Int32 max_pending = options.maxPendingMessage();
235 if (max_pending > 0)
236 _processExchangeWithControl(max_pending);
237 else
238 m_parallel_mng->processMessages(m_comms_buf);
239
240 if (m_own_send_message && m_own_recv_message) {
241 m_own_recv_message->serializer()->copy(m_own_send_message->serializer());
242 }
243 }
244
245 // Retrieves the infos of each receiver
246 for (SerializeMessage* comm : m_recv_serialize_infos)
247 comm->serializer()->setMode(ISerializer::ModeGet);
248}
249
250/*---------------------------------------------------------------------------*/
251/*---------------------------------------------------------------------------*/
252
253void ParallelExchanger::
254_processExchangeCollective()
255{
256 info() << "Using collective exchange in ParallelExchanger";
257
258 IParallelMng* pm = m_parallel_mng.get();
259 Int32 nb_rank = pm->commSize();
260
261 Int32UniqueArray send_counts(nb_rank, 0);
262 Int32UniqueArray send_indexes(nb_rank, 0);
263 Int32UniqueArray recv_counts(nb_rank, 0);
264 Int32UniqueArray recv_indexes(nb_rank, 0);
265
266 // First, determine for each proc the number of bytes to send
267 for (SerializeMessage* comm : m_send_serialize_infos) {
268 auto* sbuf = comm->trueSerializer();
269 Span<Byte> val_buf = sbuf->globalBuffer();
270 Int32 rank = comm->destRank();
271 send_counts[rank] = arcaneCheckArraySize(val_buf.size());
272 }
273
274 // Performs an AllToAll to know how many values I must receive from others.
275 {
276 Timer::SimplePrinter sp(traceMng(), "ParallelExchanger: sending sizes with AllToAll");
277 pm->allToAll(send_counts, recv_counts, 1);
278 }
279
280 // Determines the total number of infos to send and receive
281
282 // TODO: In case of overflow, it should be done in several pieces
283 // or revert to point-to-point exchanges.
284 Int32 total_send = 0;
285 Int32 total_recv = 0;
286 Int64 int64_total_send = 0;
287 Int64 int64_total_recv = 0;
288 for (Integer i = 0; i < nb_rank; ++i) {
289 send_indexes[i] = total_send;
290 recv_indexes[i] = total_recv;
291 total_send += send_counts[i];
292 total_recv += recv_counts[i];
293 int64_total_send += send_counts[i];
294 int64_total_recv += recv_counts[i];
295 }
296
297 // Checks that we do not overflow.
298 if (int64_total_send != total_send)
299 ARCANE_FATAL("Message to send is too big size={0} max=2^31", int64_total_send);
300 if (int64_total_recv != total_recv)
301 ARCANE_FATAL("Message to receive is too big size={0} max=2^31", int64_total_recv);
302
303 ByteUniqueArray send_buf(total_send);
304 ByteUniqueArray recv_buf(total_recv);
305 bool is_verbose = (m_verbosity_level >= 1);
306 if (m_verbosity_level >= 2) {
307 for (Integer i = 0; i < nb_rank; ++i) {
308 if (send_counts[i] != 0 || recv_counts[i] != 0)
309 info() << "INFOS: rank=" << i << " send_count=" << send_counts[i]
310 << " send_idx=" << send_indexes[i]
311 << " recv_count=" << recv_counts[i]
312 << " recv_idx=" << recv_indexes[i];
313 }
314 }
315
316 // Copies the serializer infos into send_buf.
317 for (SerializeMessage* comm : m_send_serialize_infos) {
318 auto* sbuf = comm->trueSerializer();
319 Span<Byte> val_buf = sbuf->globalBuffer();
320 Int32 rank = comm->destRank();
321 if (is_verbose)
322 info() << "SEND rank=" << rank << " size=" << send_counts[rank]
323 << " idx=" << send_indexes[rank]
324 << " buf_size=" << val_buf.size();
325 ByteArrayView dest_buf(send_counts[rank], &send_buf[send_indexes[rank]]);
326 dest_buf.copy(val_buf);
327 }
328
329 if (is_verbose)
330 info() << "AllToAllVariable total_send=" << total_send
331 << " total_recv=" << total_recv;
332
333 {
334 Timer::SimplePrinter sp(traceMng(), "ParallelExchanger: sending values with AllToAll");
335 pm->allToAllVariable(send_buf, send_counts, send_indexes, recv_buf, recv_counts, recv_indexes);
336 }
337 // Copies the received data back into the corresponding message.
338 for (SerializeMessage* comm : m_recv_serialize_infos) {
339 auto* sbuf = comm->trueSerializer();
340 Int32 rank = comm->destRank();
341 if (is_verbose)
342 info() << "RECV rank=" << rank << " size=" << recv_counts[rank]
343 << " idx=" << recv_indexes[rank];
344 ByteArrayView orig_buf(recv_counts[rank], &recv_buf[recv_indexes[rank]]);
345
346 sbuf->preallocate(orig_buf.size());
347 sbuf->globalBuffer().copy(orig_buf);
348 sbuf->setFromSizes();
349 }
350}
351
352/*---------------------------------------------------------------------------*/
353/*---------------------------------------------------------------------------*/
354
355ISerializeMessage* ParallelExchanger::
356messageToSend(Integer i)
357{
358 return m_send_serialize_infos[i];
359}
360
361/*---------------------------------------------------------------------------*/
362/*---------------------------------------------------------------------------*/
363
364ISerializeMessage* ParallelExchanger::
365messageToReceive(Integer i)
366{
367 return m_recv_serialize_infos[i];
368}
369
370/*---------------------------------------------------------------------------*/
371/*---------------------------------------------------------------------------*/
372
373void ParallelExchanger::
374setVerbosityLevel(Int32 v)
375{
376 if (v < 0)
377 v = 0;
379}
380
381/*---------------------------------------------------------------------------*/
382/*---------------------------------------------------------------------------*/
383
384void ParallelExchanger::
385setName(const String& name)
386{
387 m_name = name;
388}
389
390/*---------------------------------------------------------------------------*/
391/*---------------------------------------------------------------------------*/
392
393namespace
394{
395 class SortFunctor
396 {
397 public:
398
409 bool operator()(const ISerializeMessage* a, const ISerializeMessage* b)
410 {
411 const int nb_phase = 4;
412 int phase1 = a->destination().value() % nb_phase;
413 int phase2 = b->destination().value() % nb_phase;
414 if (phase1 != phase2)
415 return phase1 < phase2;
416 if (a->destination() != b->destination())
417 return a->destination() < b->destination();
418 if (a->isSend() != b->isSend())
419 return (a->isSend() ? false : true);
420 return a->source() < b->source();
421 }
422 };
423} // namespace
424
425/*---------------------------------------------------------------------------*/
426/*---------------------------------------------------------------------------*/
427
431void ParallelExchanger::
432_processExchangeWithControl(Int32 max_pending_message)
433{
434 // All messages are in 'm_comms_buf'.
435 // We copy them into 'sorted_messages' so that they are sorted.
436
437 auto message_list{ m_parallel_mng->createSerializeMessageListRef() };
438
440 std::sort(sorted_messages.begin(), sorted_messages.end(), SortFunctor{});
441
442 Integer position = 0;
443 // We must add at least a minimum number of messages to avoid blocking.
444 // The minimum is 2 to ensure there is at least one receive and one send
445 // but it is better to put more to avoid degrading performance too much.
446 max_pending_message = math::max(4, max_pending_message);
447
448 Integer nb_message = sorted_messages.size();
449 Integer nb_to_add = max_pending_message;
450
451 Int32 verbosity_level = m_verbosity_level;
452
453 if (verbosity_level >= 1)
454 info() << "ParallelExchanger " << m_name << " : process exchange WITH CONTROL"
455 << " nb_message=" << nb_message << " max_pending=" << max_pending_message;
456
457 while (position < nb_message) {
458 for (Integer i = 0; i < nb_to_add; ++i) {
459 if (position >= nb_message)
460 break;
461 ISerializeMessage* message = sorted_messages[position];
462 if (verbosity_level >= 2)
463 info() << "Add Message p=" << position << " is_send?=" << message->isSend() << " source=" << message->source()
464 << " dest=" << message->destination();
465 message_list->addMessage(message);
466 ++position;
467 }
468 // If there are no more messages left, we perform a WaitAll to wait*
469 // that all remaining messages are finished.
470 if (position >= nb_message) {
471 message_list->waitMessages(Parallel::WaitAll);
472 break;
473 }
474 // The number of finished messages indicates how many messages will need to be
475 // added to the list for the next iteration.
476 Integer nb_done = message_list->waitMessages(Parallel::WaitSome);
477 if (verbosity_level >= 2)
478 info() << "Wait nb_done=" << nb_done;
479 if (nb_done == (-1))
480 nb_done = max_pending_message;
481 nb_to_add = nb_done;
482 }
483}
484
485/*---------------------------------------------------------------------------*/
486/*---------------------------------------------------------------------------*/
487
489createParallelExchangerImpl(Ref<IParallelMng> pm)
490{
492}
493
494/*---------------------------------------------------------------------------*/
495/*---------------------------------------------------------------------------*/
496
497} // End namespace Arcane
498
499/*---------------------------------------------------------------------------*/
500/*---------------------------------------------------------------------------*/
#define ARCANE_FATAL(...)
Macro throwing a FatalErrorException.
Various mathematical functions.
Integer size() const
Number of elements in the vector.
iterator begin()
Iterator over the first element of the array.
iterator end()
Iterator over the first element after the end of the array.
constexpr Integer size() const noexcept
Number of elements in the array.
Interface of the parallelism manager for a subdomain.
virtual bool isSend() const =0
true if it should send, false if it should receive
virtual MessageRank destination() const =0
Destination rank (if isSend() is true) or sender.
virtual MessageRank source() const =0
Message sender rank.
Int32 value() const
Rank value.
Definition MessageRank.h:76
Options for IParallelMng::processExchange().
Int32 maxPendingMessage() const
Maximum number of pending messages.
void setExchangeMode(eExchangeMode mode)
Sets the exchange mode.
eExchangeMode exchangeMode() const
Specified exchange mode.
Information exchange between processors.
Int32UniqueArray m_send_ranks
List of subdomains to send.
String m_name
Instance name used for display.
UniqueArray< SerializeMessage * > m_recv_serialize_infos
List of messages to receive.
void processExchange() override
Performs the exchange using the default options of ParallelExchangerOptions.
Int32 m_verbosity_level
Verbosity level.
Timer m_timer
Timer to measure time spent in exchanges.
String name() const override
Instance name.
UniqueArray< SerializeMessage * > m_send_serialize_infos
List of messages to receive.
Int32UniqueArray m_recv_ranks
List of subdomains to receive.
eExchangeMode m_exchange_mode
Exchange mode.
UniqueArray< ISerializeMessage * > m_comms_buf
List of messages to send and receive.
Reference to an instance.
Message using a SerializeBuffer.
Sentinel for the timer. The sentinel associated with a timer allows it to be triggered upon its const...
Definition Timer.h:90
TraceMessage info() const
Flow for an information message.
1D data vector with value semantics (STL style).
T max(const T &a, const T &b, const T &c)
Returns the maximum of three elements.
Definition MathUtils.h:407
@ WaitSome
Wait until all messages in the list are processed.
String getCurrentDateTime()
Current date and time in ISO 8601 format.
-- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature --
Integer arcaneCheckArraySize(unsigned long long size)
Checks that size can be converted into an 'Integer' to serve as the size of an array....
ArrayView< Byte > ByteArrayView
C equivalent of a 1D array of characters.
Definition UtilsTypes.h:447
std::int64_t Int64
Signed integer type of 64 bits.
Int32 Integer
Type representing an integer.
ConstArrayView< Int32 > Int32ConstArrayView
C equivalent of a 1D array of 32-bit integers.
Definition UtilsTypes.h:482
UniqueArray< Byte > ByteUniqueArray
Dynamic 1D array of characters.
Definition UtilsTypes.h:335
UniqueArray< Int32 > Int32UniqueArray
Dynamic 1D array of 32-bit integers.
Definition UtilsTypes.h:341
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Creates a reference on a pointer.
UniqueArray< Integer > IntegerUniqueArray
Dynamic 1D array of integers.
Definition UtilsTypes.h:347
std::int32_t Int32
Signed integer type of 32 bits.