14#include "arcane/impl/ParallelExchanger.h"
16#include "arcane/utils/NotSupportedException.h"
17#include "arcane/utils/FatalErrorException.h"
18#include "arcane/utils/PlatformUtils.h"
21#include "arcane/core/IParallelMng.h"
22#include "arcane/core/SerializeBuffer.h"
23#include "arcane/core/Timer.h"
24#include "arcane/core/ISerializeMessageList.h"
25#include "arcane/core/internal/SerializeMessage.h"
48ParallelExchanger(Ref<IParallelMng> pm)
49: TraceAccessor(pm->traceMng())
51, m_timer(pm->timerMng(),
"ParallelExchangerTimer",Timer::TimerReal)
53 String use_collective_str = platform::getEnvironmentVariable(
"ARCANE_PARALLEL_EXCHANGER_USE_COLLECTIVE");
54 if (use_collective_str==
"1" || use_collective_str==
"TRUE")
55 m_exchange_mode = EM_Collective;
64 for(
auto* buf : m_comms_buf )
67 delete m_own_send_message;
68 delete m_own_recv_message;
74IParallelMng* ParallelExchanger::
77 return m_parallel_mng.get();
83bool ParallelExchanger::
84initializeCommunicationsMessages()
88 gather_input_send_ranks[0] = nb_send_rank;
90 std::begin(gather_input_send_ranks)+1);
93 Integer nb_rank = m_parallel_mng->commSize();
94 m_parallel_mng->allGatherVariable(gather_input_send_ranks,
95 gather_output_send_ranks);
99 Int32 my_rank = m_parallel_mng->commRank();
102 for(
Integer i=0; i<nb_rank; ++i ){
103 Integer nb_comm = gather_output_send_ranks[gather_index];
104 total_comm_rank += nb_comm;
106 for(
Integer z=0; z<nb_comm; ++z ){
107 Integer current_rank = gather_output_send_ranks[gather_index+z];
108 if (current_rank==my_rank)
111 gather_index += nb_comm;
115 if (total_comm_rank==0)
118 _initializeCommunicationsMessages();
126void ParallelExchanger::
131 _initializeCommunicationsMessages();
137void ParallelExchanger::
138_initializeCommunicationsMessages()
140 if (m_verbosity_level>=1){
141 info() <<
"ParallelExchanger " << m_name <<
" : nb_send=" << m_send_ranks.size()
142 <<
" nb_recv=" << m_recv_ranks.size();
143 if (m_verbosity_level>=2){
144 info() <<
"ParallelExchanger " << m_name <<
" : send=" << m_send_ranks;
145 info() <<
"ParallelExchanger " << m_name <<
" : recv=" << m_recv_ranks;
149 Int32 my_rank = m_parallel_mng->commRank();
151 for( Int32 msg_rank : m_send_ranks ){
152 auto* comm =
new SerializeMessage(my_rank,msg_rank,ISerializeMessage::MT_Send);
155 if (my_rank==msg_rank)
156 m_own_send_message = comm;
158 m_comms_buf.add(comm);
159 m_send_serialize_infos.add(comm);
166void ParallelExchanger::
177void ParallelExchanger::
181 info() <<
"ParallelExchanger " <<
m_name <<
": ProcessExchange (begin)"
186 _processExchange(options);
190 info() <<
"ParallelExchanger " <<
m_name <<
": ProcessExchange (end)"
191 <<
" total_time=" <<
m_timer.lastActivationTime();
197void ParallelExchanger::
200 if (m_verbosity_level>=1){
201 Int64 total_size = 0;
203 Int64 message_size = comm->trueSerializer()->totalSize();
204 total_size += message_size;
205 if (m_verbosity_level>=2)
206 info() <<
"Send rank=" << comm->destination() <<
" size=" << message_size;
208 info() <<
"ParallelExchanger " << m_name <<
": ProcessExchange"
209 <<
" total_size=" << total_size <<
" nb_message=" << m_comms_buf.size();
212 bool use_all_to_all =
false;
214 use_all_to_all =
true;
219 Int32 my_rank = m_parallel_mng->commRank();
220 for( Int32 msg_rank : m_recv_ranks ){
221 auto* comm =
new SerializeMessage(my_rank,msg_rank,ISerializeMessage::MT_Recv);
224 if (my_rank==msg_rank)
225 m_own_recv_message = comm;
227 m_comms_buf.add(comm);
228 m_recv_serialize_infos.add(comm);
232 _processExchangeCollective();
236 _processExchangeWithControl(max_pending);
238 m_parallel_mng->processMessages(m_comms_buf);
240 if (m_own_send_message && m_own_recv_message){
241 m_own_recv_message->serializer()->copy(m_own_send_message->serializer());
246 for( SerializeMessage* comm : m_recv_serialize_infos )
247 comm->serializer()->setMode(ISerializer::ModeGet);
253void ParallelExchanger::
254_processExchangeCollective()
256 info() <<
"Using collective exchange in ParallelExchanger";
258 IParallelMng* pm = m_parallel_mng.get();
259 Int32 nb_rank = pm->commSize();
267 for( SerializeMessage* comm : m_send_serialize_infos ){
268 auto* sbuf = comm->trueSerializer();
269 Span<Byte> val_buf = sbuf->globalBuffer();
270 Int32 rank = comm->destRank();
276 Timer::SimplePrinter sp(traceMng(),
"ParallelExchanger: sending sizes with AllToAll");
277 pm->allToAll(send_counts,recv_counts,1);
284 Int32 total_send = 0;
285 Int32 total_recv = 0;
286 Int64 int64_total_send = 0;
287 Int64 int64_total_recv = 0;
288 for( Integer i=0; i<nb_rank; ++i ){
289 send_indexes[i] = total_send;
290 recv_indexes[i] = total_recv;
291 total_send += send_counts[i];
292 total_recv += recv_counts[i];
293 int64_total_send += send_counts[i];
294 int64_total_recv += recv_counts[i];
298 if (int64_total_send!=total_send)
299 ARCANE_FATAL(
"Message to send is too big size={0} max=2^31",int64_total_send);
300 if (int64_total_recv!=total_recv)
301 ARCANE_FATAL(
"Message to receive is too big size={0} max=2^31",int64_total_recv);
305 bool is_verbose = (m_verbosity_level>=1);
306 if (m_verbosity_level>=2){
307 for( Integer i=0; i<nb_rank; ++i ){
308 if (send_counts[i]!=0 || recv_counts[i]!=0)
309 info() <<
"INFOS: rank=" << i <<
" send_count=" << send_counts[i]
310 <<
" send_idx=" << send_indexes[i]
311 <<
" recv_count=" << recv_counts[i]
312 <<
" recv_idx=" << recv_indexes[i];
317 for( SerializeMessage* comm : m_send_serialize_infos ){
318 auto* sbuf = comm->trueSerializer();
319 Span<Byte> val_buf = sbuf->globalBuffer();
320 Int32 rank = comm->destRank();
322 info() <<
"SEND rank=" << rank <<
" size=" << send_counts[rank]
323 <<
" idx=" << send_indexes[rank]
324 <<
" buf_size=" << val_buf.size();
325 ByteArrayView dest_buf(send_counts[rank],&send_buf[send_indexes[rank]]);
326 dest_buf.copy(val_buf);
330 info() <<
"AllToAllVariable total_send=" << total_send
331 <<
" total_recv=" << total_recv;
334 Timer::SimplePrinter sp(traceMng(),
"ParallelExchanger: sending values with AllToAll");
335 pm->allToAllVariable(send_buf,send_counts,send_indexes,recv_buf,recv_counts,recv_indexes);
338 for( SerializeMessage* comm : m_recv_serialize_infos ){
339 auto* sbuf = comm->trueSerializer();
340 Int32 rank = comm->destRank();
342 info() <<
"RECV rank=" << rank <<
" size=" << recv_counts[rank]
343 <<
" idx=" << recv_indexes[rank];
344 ByteArrayView orig_buf(recv_counts[rank],&recv_buf[recv_indexes[rank]]);
346 sbuf->preallocate(orig_buf.size());
347 sbuf->globalBuffer().copy(orig_buf);
348 sbuf->setFromSizes();
373void ParallelExchanger::
374setVerbosityLevel(
Int32 v)
384void ParallelExchanger::
410 const int nb_phase = 4;
411 int phase1 = a->destination().value() % nb_phase;
413 if (phase1 != phase2)
414 return phase1<phase2;
417 if (a->isSend() != b->
isSend())
418 return (a->isSend() ?
false :
true);
419 return a->source() < b->
source();
429void ParallelExchanger::
430_processExchangeWithControl(
Int32 max_pending_message)
435 auto message_list {m_parallel_mng->createSerializeMessageListRef()};
438 std::sort(sorted_messages.
begin(),sorted_messages.
end(),SortFunctor{});
444 max_pending_message =
math::max(4,max_pending_message);
447 Integer nb_to_add = max_pending_message;
451 if (verbosity_level>=1)
452 info() <<
"ParallelExchanger " <<
m_name <<
" : process exchange WITH CONTROL"
453 <<
" nb_message=" << nb_message <<
" max_pending=" << max_pending_message;
455 while(position<nb_message){
456 for(
Integer i=0; i<nb_to_add; ++i ){
457 if (position>=nb_message)
460 if (verbosity_level>=2)
461 info() <<
"Add Message p=" << position <<
" is_send?=" << message->
isSend() <<
" source=" << message->
source()
463 message_list->addMessage(message);
468 if (position>=nb_message){
469 message_list->waitMessages(Parallel::WaitAll);
475 if (verbosity_level>=2)
476 info() <<
"Wait nb_done=" << nb_done;
478 nb_done = max_pending_message;
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Fonctions mathématiques diverses.
Integer size() const
Nombre d'éléments du vecteur.
iterator end()
Itérateur sur le premier élément après la fin du tableau.
iterator begin()
Itérateur sur le premier élément du tableau.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
Interface du gestionnaire de parallélisme pour un sous-domaine.
Interface d'un message de sérialisation entre IMessagePassingMng.
virtual bool isSend() const =0
true s'il faut envoyer, false s'il faut recevoir
virtual MessageRank destination() const =0
Rang du destinataire (si isSend() est vrai) ou de l'envoyeur.
virtual MessageRank source() const =0
Rang de l'envoyeur du message.
Int32 value() const
Valeur du rang.
Options pour IParallelMng::processExchange().
Int32 maxPendingMessage() const
Nombre maximal de messages en vol.
void setExchangeMode(eExchangeMode mode)
Positionne le mode d'échange.
eExchangeMode
Mode d'échange.
eExchangeMode exchangeMode() const
Mode d'échange spécifié
Echange d'informations entre processeurs.
Int32UniqueArray m_send_ranks
Liste des sous-domaines à envoyer.
String m_name
Nom de l'instance utilisé pour l'affichage.
UniqueArray< SerializeMessage * > m_recv_serialize_infos
Liste des message à recevoir.
void processExchange() override
Effectue l'échange avec les options par défaut de ParallelExchangerOptions.
Int32 m_verbosity_level
Niveau de verbosité
Timer m_timer
Timer pour mesurer le temps passé dans les échanges.
String name() const override
Nom de l'instance.
UniqueArray< SerializeMessage * > m_send_serialize_infos
Liste des message à recevoir.
Int32UniqueArray m_recv_ranks
Liste des sous-domaines à recevoir.
eExchangeMode m_exchange_mode
Mode d'échange.
UniqueArray< ISerializeMessage * > m_comms_buf
Liste des message à envoyer et recevoir.
Référence à une instance.
Message utilisant un SerializeBuffer.
Chaîne de caractères unicode.
Sentinelle pour le timer. La sentinelle associée à un timer permet de déclancher celui-ci au moment d...
TraceMessage info() const
Flot pour un message d'information.
Vecteur 1D de données avec sémantique par valeur (style STL).
T max(const T &a, const T &b, const T &c)
Retourne le maximum de trois éléments.
@ WaitSome
Attend que tous les messages de la liste soient traités.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
Integer arcaneCheckArraySize(unsigned long long size)
Vérifie que size peut être converti dans un 'Integer' pour servir de taille à un tableau....
ArrayView< Byte > ByteArrayView
Equivalent C d'un tableau à une dimension de caractères.
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
UniqueArray< Byte > ByteUniqueArray
Tableau dynamique à une dimension de caractères.
UniqueArray< Int32 > Int32UniqueArray
Tableau dynamique à une dimension d'entiers 32 bits.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
UniqueArray< Integer > IntegerUniqueArray
Tableau dynamique à une dimension d'entiers.
std::int32_t Int32
Type entier signé sur 32 bits.