14#include "arcane/impl/ParallelExchanger.h"
16#include "arcane/utils/NotSupportedException.h"
17#include "arcane/utils/FatalErrorException.h"
18#include "arcane/utils/PlatformUtils.h"
20#include "arcane/MathUtils.h"
21#include "arcane/IParallelMng.h"
22#include "arcane/SerializeBuffer.h"
23#include "arcane/SerializeMessage.h"
24#include "arcane/Timer.h"
25#include "arcane/ISerializeMessageList.h"
39ParallelExchanger(IParallelMng* pm)
48ParallelExchanger(Ref<IParallelMng> pm)
49: TraceAccessor(pm->traceMng())
51, m_timer(pm->timerMng(),
"ParallelExchangerTimer",Timer::TimerReal)
53 String use_collective_str = platform::getEnvironmentVariable(
"ARCANE_PARALLEL_EXCHANGER_USE_COLLECTIVE");
54 if (use_collective_str==
"1" || use_collective_str==
"TRUE")
55 m_exchange_mode = EM_Collective;
64 for(
auto* buf : m_comms_buf )
67 delete m_own_send_message;
68 delete m_own_recv_message;
74IParallelMng* ParallelExchanger::
77 return m_parallel_mng.get();
83bool ParallelExchanger::
84initializeCommunicationsMessages()
89 std::copy(std::begin(m_send_ranks),std::end(m_send_ranks),
93 Integer nb_rank = m_parallel_mng->commSize();
99 Int32
my_rank = m_parallel_mng->commRank();
102 for( Integer i=0; i<nb_rank; ++i ){
106 for( Integer z=0; z<
nb_comm; ++z ){
118 _initializeCommunicationsMessages();
126void ParallelExchanger::
131 _initializeCommunicationsMessages();
137void ParallelExchanger::
138_initializeCommunicationsMessages()
140 if (m_verbosity_level>=1){
141 info() <<
"ParallelExchanger " << m_name <<
" : nb_send=" << m_send_ranks.size()
142 <<
" nb_recv=" << m_recv_ranks.size();
143 if (m_verbosity_level>=2){
144 info() <<
"ParallelExchanger " << m_name <<
" : send=" << m_send_ranks;
145 info() <<
"ParallelExchanger " << m_name <<
" : recv=" << m_recv_ranks;
149 Int32 my_rank = m_parallel_mng->commRank();
151 for(
Int32 msg_rank : m_send_ranks ){
152 auto* comm =
new SerializeMessage(my_rank,msg_rank,ISerializeMessage::MT_Send);
155 if (my_rank==msg_rank)
156 m_own_send_message = comm;
158 m_comms_buf.add(comm);
159 m_send_serialize_infos.add(comm);
166void ParallelExchanger::
171 processExchange(options);
177void ParallelExchanger::
180 if (m_verbosity_level>=1)
181 info() <<
"ParallelExchanger " << m_name <<
": ProcessExchange (begin)"
182 <<
" date=" << platform::getCurrentDateTime();
186 _processExchange(options);
189 if (m_verbosity_level>=1)
190 info() <<
"ParallelExchanger " << m_name <<
": ProcessExchange (end)"
191 <<
" total_time=" << m_timer.lastActivationTime();
197void ParallelExchanger::
200 if (m_verbosity_level>=1){
205 if (m_verbosity_level>=2)
208 info() <<
"ParallelExchanger " << m_name <<
": ProcessExchange"
209 <<
" total_size=" << total_size <<
" nb_message=" << m_comms_buf.size();
212 bool use_all_to_all =
false;
214 use_all_to_all =
true;
219 Int32 my_rank = m_parallel_mng->commRank();
220 for(
Int32 msg_rank : m_recv_ranks ){
221 auto* comm =
new SerializeMessage(my_rank,msg_rank,ISerializeMessage::MT_Recv);
224 if (my_rank==msg_rank)
225 m_own_recv_message = comm;
227 m_comms_buf.add(comm);
228 m_recv_serialize_infos.add(comm);
232 _processExchangeCollective();
236 _processExchangeWithControl(max_pending);
238 m_parallel_mng->processMessages(m_comms_buf);
240 if (m_own_send_message && m_own_recv_message){
241 m_own_recv_message->serializer()->copy(m_own_send_message->serializer());
246 for( SerializeMessage* comm : m_recv_serialize_infos )
247 comm->serializer()->setMode(ISerializer::ModeGet);
253void ParallelExchanger::
254_processExchangeCollective()
256 info() <<
"Using collective exchange in ParallelExchanger";
258 IParallelMng* pm = m_parallel_mng.get();
267 for( SerializeMessage* comm : m_send_serialize_infos ){
268 auto* sbuf = comm->trueSerializer();
269 Span<Byte> val_buf = sbuf->globalBuffer();
270 Int32 rank = comm->destRank();
276 Timer::SimplePrinter sp(traceMng(),
"ParallelExchanger: sending sizes with AllToAll");
277 pm->allToAll(send_counts,recv_counts,1);
284 Int32 total_send = 0;
285 Int32 total_recv = 0;
286 Int64 int64_total_send = 0;
287 Int64 int64_total_recv = 0;
288 for( Integer i=0; i<nb_rank; ++i ){
289 send_indexes[i] = total_send;
290 recv_indexes[i] = total_recv;
291 total_send += send_counts[i];
292 total_recv += recv_counts[i];
293 int64_total_send += send_counts[i];
294 int64_total_recv += recv_counts[i];
298 if (int64_total_send!=total_send)
299 ARCANE_FATAL(
"Message to send is too big size={0} max=2^31",int64_total_send);
300 if (int64_total_recv!=total_recv)
301 ARCANE_FATAL(
"Message to receive is too big size={0} max=2^31",int64_total_recv);
305 bool is_verbose = (m_verbosity_level>=1);
306 if (m_verbosity_level>=2){
307 for( Integer i=0; i<nb_rank; ++i ){
308 if (send_counts[i]!=0 || recv_counts[i]!=0)
309 info() <<
"INFOS: rank=" << i <<
" send_count=" << send_counts[i]
310 <<
" send_idx=" << send_indexes[i]
311 <<
" recv_count=" << recv_counts[i]
312 <<
" recv_idx=" << recv_indexes[i];
317 for( SerializeMessage* comm : m_send_serialize_infos ){
318 auto* sbuf = comm->trueSerializer();
319 Span<Byte> val_buf = sbuf->globalBuffer();
320 Int32 rank = comm->destRank();
322 info() <<
"SEND rank=" << rank <<
" size=" << send_counts[rank]
323 <<
" idx=" << send_indexes[rank]
324 <<
" buf_size=" << val_buf.size();
325 ByteArrayView dest_buf(send_counts[rank],&send_buf[send_indexes[rank]]);
326 dest_buf.copy(val_buf);
330 info() <<
"AllToAllVariable total_send=" << total_send
331 <<
" total_recv=" << total_recv;
334 Timer::SimplePrinter sp(traceMng(),
"ParallelExchanger: sending values with AllToAll");
335 pm->allToAllVariable(send_buf,send_counts,send_indexes,recv_buf,recv_counts,recv_indexes);
338 for( SerializeMessage* comm : m_recv_serialize_infos ){
339 auto* sbuf = comm->trueSerializer();
340 Int32 rank = comm->destRank();
342 info() <<
"RECV rank=" << rank <<
" size=" << recv_counts[rank]
343 <<
" idx=" << recv_indexes[rank];
344 ByteArrayView orig_buf(recv_counts[rank],&recv_buf[recv_indexes[rank]]);
346 sbuf->preallocate(orig_buf.size());
347 sbuf->globalBuffer().copy(orig_buf);
348 sbuf->setFromSizes();
356messageToSend(Integer i)
358 return m_send_serialize_infos[i];
365messageToReceive(Integer i)
367 return m_recv_serialize_infos[i];
373void ParallelExchanger::
374setVerbosityLevel(Int32 v)
378 m_verbosity_level = v;
384void ParallelExchanger::
385setName(
const String& name)
415 if (a->destination() !=
b->destination())
416 return a->destination() <
b->destination();
417 if (a->isSend() !=
b->isSend())
418 return (a->isSend() ?
false :
true);
419 return a->source() <
b->source();
429void ParallelExchanger::
435 auto message_list {m_parallel_mng->createSerializeMessageListRef()};
440 Integer position = 0;
452 info() <<
"ParallelExchanger " << m_name <<
" : process exchange WITH CONTROL"
461 info() <<
"Add Message p=" << position <<
" is_send?=" << message->
isSend() <<
" source=" << message->
source()
476 info() <<
"Wait nb_done=" <<
nb_done;
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
virtual Int32 commSize() const =0
Nombre d'instance dans le communicateur.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Options pour IParallelMng::processExchange().
Int32 maxPendingMessage() const
Nombre maximal de messages en vol.
void setExchangeMode(eExchangeMode mode)
Positionne le mode d'échange.
eExchangeMode
Mode d'échange.
eExchangeMode exchangeMode() const
Mode d'échange spécifié
Echange d'informations entre processeurs.
Message utilisant un SerializeBuffer.
Sentinelle pour le timer. La sentinelle associée à un timer permet de déclancher celui-ci au moment d...
Vue constante d'un tableau de type T.
Interface d'un message de sérialisation entre IMessagePassingMng.
virtual MessageRank destination() const =0
Rang du destinataire (si isSend() est vrai) ou de l'envoyeur.
virtual bool isSend() const =0
true s'il faut envoyer, false s'il faut recevoir
virtual MessageRank source() const =0
Rang de l'envoyeur du message.
Chaîne de caractères unicode.
Vecteur 1D de données avec sémantique par valeur (style STL).
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
Integer arcaneCheckArraySize(unsigned long long size)
Vérifie que size peut être converti dans un 'Integer' pour servir de taille à un tableau....
UniqueArray< Byte > ByteUniqueArray
Tableau dynamique à une dimension de caractères.
UniqueArray< Int32 > Int32UniqueArray
Tableau dynamique à une dimension d'entiers 32 bits.
ArrayView< Byte > ByteArrayView
Equivalent C d'un tableau à une dimension de caractères.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.