14#include "arcane/mesh/BasicParticleExchanger.h"
16#include "arcane/core/internal/SerializeMessage.h"
38BasicParticleExchanger::
39BasicParticleExchanger(
const ServiceBuildInfo& sbi)
40: ArcaneBasicParticleExchangerObject(sbi)
47BasicParticleExchanger::
48~BasicParticleExchanger()
51 if (!m_pending_messages.empty() || !m_waiting_messages.empty())
52 pwarning() << String::format(
"Pending or waiting messages nb_pending={0} nb_waiting={1}",
53 m_pending_messages.size(),m_waiting_messages.size());
60void BasicParticleExchanger::
65 m_item_family = item_family;
73 info() <<
"Initialize BasicParticleExchanger family=" << item_family->
name();
80void BasicParticleExchanger::
83 for(
auto msg : m_accumulate_infos )
85 m_accumulate_infos.clear();
91void BasicParticleExchanger::
92beginNewExchange(
Integer i_nb_particle)
95 String function_id =
"BasicParticleExchanger::beginNewExchange >>> ";
99 m_debug_exchange_items_level =
options()->debugExchangeItemsLevel();
104 m_last_nb_to_exchange = 0;
105 m_exchange_finished =
false;
106 m_print_info =
false;
107 m_current_nb_reduce = 0;
108 m_nb_particle_send = 0;
111 Int64 nb_particle = i_nb_particle;
112 Int64 min_nb_particle = 0;
113 Int64 max_nb_particle = 0;
114 Int64 nb_total_particle = 0;
118 nb_total_particle,min_rank,max_rank);
120 if (m_verbose_level>=1)
121 info() << function_id <<
"** NB PARTICLES IN VOL total=" << nb_total_particle
122 <<
" min=" << min_nb_particle <<
" max=" << max_nb_particle
123 <<
" min_rank=" << min_rank <<
" max_rank=" << max_rank
126 m_nb_total_particle_finish_exchange = 0;
140void BasicParticleExchanger::
141sendItems(
Integer nb_particle_finish_exchange,
145 ARCANE_UNUSED(nb_particle_finish_exchange);
147 m_nb_particle_send = local_ids.
size();
150 _generateSendItems(local_ids,sub_domains_to_send);
152 info(5) << A_FUNCINFO <<
"sendItems " <<
m_timer->lastActivationTime();
154 _sendPendingMessages();
160bool BasicParticleExchanger::
161exchangeItems(
Integer nb_particle_finish_exchange,
168 sendItems(nb_particle_finish_exchange,local_ids,sub_domains_to_send);
170 if (!item_group.
null())
173 return _waitMessages(0,item_group,
nullptr,functor);
179bool BasicParticleExchanger::
180exchangeItems(
Integer nb_particle_finish_exchange,
187 sendItems(nb_particle_finish_exchange,local_ids,sub_domains_to_send);
189 if (new_particle_local_ids)
190 new_particle_local_ids->
clear();
192 return _waitMessages(0,
ItemGroup(),new_particle_local_ids,functor);
198void BasicParticleExchanger::
206 String func_name(
"BasicParticleExchanger::sendItems()");
209 mesh->cellFamily()->getCommunicatingSubDomains(communicating_sub_domains);
211 Integer nb_connected_sub_domain = communicating_sub_domains.
size();
216 m_accumulate_infos.clear();
217 m_accumulate_infos.resize(nb_connected_sub_domain);
218 for(
Integer i=0; i<nb_connected_sub_domain; ++i ){
219 m_accumulate_infos[i] =
new SerializeMessage(m_rank,communicating_sub_domains[i],
220 ISerializeMessage::MT_Send);
223 if (m_rank==0 && i==0){
224 warning() <<
" WRONG MESSAGE";
226 ISerializeMessage::MT_Recv);
227 m_pending_messages.add(sm);
232 _addItemsToSend(local_ids,sub_domains_to_send,communicating_sub_domains,ids_to_send);
234 if (m_debug_exchange_items_level>=1){
235 info() <<
"-- Subdomain " << m_rank <<
". NB to send: " << local_ids.
size()
236 <<
" NB connected subdomain: " << nb_connected_sub_domain;
237 debug() <<
"NB connected subdomain for " << m_rank <<
" : " << m_accumulate_infos.size();
238 for( Integer i=0, s=m_accumulate_infos.size(); i<s; ++i ){
239 debug() <<
"NB for the subdomain " << m_accumulate_infos[i]->destRank()
240 <<
" " << ids_to_send[i].size();
247 for( Integer j=0; j<nb_connected_sub_domain; ++j ){
248 ISerializeMessage* sm = m_accumulate_infos[j];
252 _serializeMessage(sm,ids_to_send[j],items_to_send_uid,
253 items_to_send_cells_uid);
255 m_pending_messages.add(sm);
258 auto* recv_sm =
new SerializeMessage(m_rank,sm->destination().value(),
259 ISerializeMessage::MT_Recv);
260 m_pending_messages.add(recv_sm);
263 m_accumulate_infos.
clear();
265 m_item_family->toParticleFamily()->removeParticles(local_ids);
266 m_item_family->endUpdate();
272void BasicParticleExchanger::
273_addItemsToSend(Int32ConstArrayView local_ids,
274 Int32ConstArrayView sub_domains_to_send,
275 Int32ConstArrayView communicating_sub_domains,
276 UniqueArray< SharedArray<Int32> >& ids_to_send)
278 const Int32 debug_exchange_items_level = m_debug_exchange_items_level;
279 Int32 nb_connected_sub_domain = ids_to_send.size();
282 Int32 id_size = local_ids.size();
283 for( Integer i=0; i<id_size; ++i ){
284 Int32 item_local_id = local_ids[i];
285 Int32 sd_to_send = sub_domains_to_send[i];
286 if (sd_to_send==m_rank)
288 ARCANE_FATAL(
"The entity with local index {0} should not be sent to its own subdomain",
293 Integer sd_index = nb_connected_sub_domain;
294 for( Integer i_sd=0; i_sd<nb_connected_sub_domain; ++i_sd )
295 if (sd_to_send==communicating_sub_domains[i_sd]){
299 if (sd_index==nb_connected_sub_domain)
301 ids_to_send[sd_index].add(item_local_id);
302 if (debug_exchange_items_level>=1)
303 pinfo() <<
"ADD ITEM TO SEND lid=" << item_local_id <<
" sd_index=" << sd_index
304 <<
" sd=" << communicating_sub_domains[sd_index] <<
" n=" << ids_to_send[sd_index].size()
305 <<
" begin=" << ids_to_send[sd_index].data();
307 if (debug_exchange_items_level>=1)
308 for( Integer i=0; i<nb_connected_sub_domain; ++i )
309 pinfo() <<
"SEND INFO sd_index=" << i
310 <<
" sd=" << communicating_sub_domains[i] <<
" n=" << ids_to_send[i].size()
311 <<
" begin=" << ids_to_send[i].data();
313 if ((debug_exchange_items_level>=1 && m_print_info) || debug_exchange_items_level>=2){
314 IParallelMng* pm = m_parallel_mng;
315 Int32 rank = pm->commRank();
316 ParticleInfoListView items(m_item_family);
317 Integer nb_print = math::min(5,id_size);
318 for( Integer i=0; i<nb_print; ++i ){
319 Particle part(items[local_ids[i]]);
320 pinfo() <<
" RANK=" << rank <<
" LID=" << local_ids[i] <<
" SD=" << sub_domains_to_send[i]
321 <<
" uid=" << part.uniqueId() <<
" cell=" << ItemPrinter(part.cell());
330bool BasicParticleExchanger::
333 return _waitMessages(nb_pending_particle,
ItemGroup(),new_particle_local_ids,functor);
339bool BasicParticleExchanger::
343 String func_name =
"BasicParticleExchanger::waitMessages";
344 _waitMessages(item_group,new_particle_local_ids,functor);
346 bool do_reduce = m_current_nb_reduce>m_last_nb_reduce;
347 if (m_max_nb_message_without_reduce!=(-1))
348 do_reduce |= m_current_nb_reduce>m_max_nb_message_without_reduce;
351 Int64 nb_to_exchange = 0;
355 Int64 current_exchange = m_nb_particle_send+nb_pending_particle;
358 info(4) << func_name <<
"TimeReduce=" << m_timer->lastActivationTime()
359 <<
" nbtoexchange=" << nb_to_exchange;
360 m_exchange_finished = (nb_to_exchange==0);
361 if (nb_to_exchange>0 && m_last_nb_to_exchange==nb_to_exchange && m_nb_loop>300){
364 m_last_nb_to_exchange = nb_to_exchange;
367 ++m_current_nb_reduce;
368 debug() << func_name <<
" ** RETURN EXCHANGE m_exchange_finished: " << m_exchange_finished;
369 if (m_exchange_finished){
370 m_last_nb_reduce = m_current_nb_reduce - 4;
371 if (m_verbose_level>=1)
372 info() << func_name <<
" exchange finished "
373 <<
" n=" << m_current_nb_reduce
374 <<
" date=" << platform::getCurrentDateTime();
376 return m_exchange_finished;
382void BasicParticleExchanger::
383_waitMessages(ItemGroup item_group,Int32Array* new_particle_local_ids,IFunctor* functor)
387 Timer::Sentry ts(m_timer);
388 functor->executeFunctor();
390 m_total_time_functor += m_timer->lastActivationTime();
391 if (m_debug_exchange_items_level>=1)
392 info() <<
"TimeFunctor: current=" << m_timer->lastActivationTime()
393 <<
" total=" << m_total_time_functor;
397 Timer::Sentry ts(m_timer);
398 m_message_list->waitMessages(Parallel::WaitAll);
400 m_total_time_waiting += m_timer->lastActivationTime();
401 if (m_debug_exchange_items_level>=1)
402 info() <<
"TimeWaiting: current=" << m_timer->lastActivationTime()
403 <<
" total=" << m_total_time_waiting;
407 UniqueArray<ISerializeMessage*> current_messages(m_waiting_messages);
408 m_waiting_messages.clear();
414 for( ISerializeMessage* sm : current_messages ){
416 _deserializeMessage(sm,items_to_create_unique_id,items_to_create_cells_unique_id,
417 items_to_create_local_id,items_to_create_cells_local_id,
418 item_group,new_particle_local_ids);
421 if (!m_waiting_messages.empty())
422 ARCANE_FATAL(
"Pending messages n={0}",m_waiting_messages.size());
428void BasicParticleExchanger::
429_sendPendingMessages()
431 IParallelMng* pm = m_parallel_mng;
433 if (!m_message_list.get())
437 Timer::Sentry ts(m_timer);
439 Integer nb_message = m_pending_messages.size();
440 for( Integer i=0; i<nb_message; ++i ){
441 m_message_list->addMessage(m_pending_messages[i]);
442 m_waiting_messages.add(m_pending_messages[i]);
444 m_message_list->processPendingMessages();
445 m_pending_messages.clear();
452void BasicParticleExchanger::
453_serializeMessage(ISerializeMessage* sm,
454 Int32ConstArrayView acc_ids,
455 Int64Array& items_to_send_uid,
456 Int64Array& items_to_send_cells_uid)
458 ParticleInfoListView internal_items(m_item_family);
460 ISerializer* sbuf = sm->serializer();
461 sbuf->setMode(ISerializer::ModeReserve);
465 Integer nb_item = acc_ids.size();
469 sbuf->reserveInt64(1);
471 sbuf->reserveInt64(1);
472 sbuf->reserveSpan(eBasicDataType::Int64,nb_item);
475 sbuf->reserveSpan(eBasicDataType::Int64,nb_item);
477 for( VariableList::Enumerator i_var(m_variables_to_exchange); ++i_var; ){
478 IVariable* var = *i_var;
479 var->serialize(sbuf,acc_ids);
483 sbuf->allocateBuffer();
485 if (m_debug_exchange_items_level>=1)
486 info() <<
"BSE_SerializeMessage nb_item=" << nb_item
487 <<
" id=" << m_serialize_id
488 <<
" dest=" << sm->destination();
490 sbuf->setMode(ISerializer::ModePut);
492 sbuf->putInt64(m_serialize_id);
495 sbuf->putInt64(nb_item);
496 items_to_send_uid.resize(nb_item);
497 items_to_send_cells_uid.resize(nb_item);
498 for( Integer z=0; z<nb_item; ++z ){
499 Particle item = internal_items[acc_ids[z]];
500 items_to_send_uid[z] = item.uniqueId();
501 bool has_cell = item.hasCell();
502 items_to_send_cells_uid[z] = (has_cell) ? item.cell().uniqueId() : NULL_ITEM_UNIQUE_ID;
503 if (m_debug_exchange_items_level>=2){
504 info() <<
"Particle BufID=" << acc_ids[z]
505 <<
" LID=" << item.localId()
506 <<
" UID=" << items_to_send_uid[z]
507 <<
" CellIUID=" << items_to_send_cells_uid[z]
508 <<
" (owner=" << item.cell().owner() <<
")";
511 sbuf->putSpan(items_to_send_uid);
512 sbuf->putSpan(items_to_send_cells_uid);
514 for( VariableList::Enumerator i_var(m_variables_to_exchange); ++i_var; ){
515 IVariable* var = *i_var;
516 var->serialize(sbuf,acc_ids);
524void BasicParticleExchanger::
525_deserializeMessage(ISerializeMessage* message,
526 Int64Array& items_to_create_unique_id,
527 Int64Array& items_to_create_cells_unique_id,
528 Int32Array& items_to_create_local_id,
529 Int32Array& items_to_create_cells_local_id,
530 ItemGroup item_group,
531 Int32Array* new_particle_local_ids)
534 IMesh* mesh = m_item_family->mesh();
535 ISerializer* sbuf = message->serializer();
536 IItemFamily* cell_family = mesh->cellFamily();
539 sbuf->setMode(ISerializer::ModeGet);
540 sbuf->setReadMode(ISerializer::ReadReplace);
543 Int64 serialize_id = sbuf->getInt64();
544 Int64 nb_item = sbuf->getInt64();
545 if (m_debug_exchange_items_level>=1)
546 info() <<
"BSE_DeserializeMessage id=" << serialize_id <<
" nb=" << nb_item
547 <<
" orig=" << message->destination();
549 items_to_create_local_id.resize(nb_item);
550 items_to_create_unique_id.resize(nb_item);
551 items_to_create_cells_unique_id.resize(nb_item);
552 items_to_create_cells_local_id.resize(nb_item);
553 sbuf->getSpan(items_to_create_unique_id);
554 sbuf->getSpan(items_to_create_cells_unique_id);
555 if (m_debug_exchange_items_level>=2){
557 for( Integer z=0; z<nb_item; ++z ){
558 info() <<
"Particle UID=" << items_to_create_unique_id[z]
559 <<
" CellIUID=" << items_to_create_cells_unique_id[z];
563 items_to_create_cells_local_id.resize(nb_item);
564 cell_family->itemsUniqueIdToLocalId(items_to_create_cells_local_id,items_to_create_cells_unique_id);
566 m_item_family->toParticleFamily()->addParticles(items_to_create_unique_id,
567 items_to_create_cells_local_id,
568 items_to_create_local_id);
572 m_item_family->endUpdate();
576 ParticleInfoListView internal_items(m_item_family);
578 for( Integer z=0; z<nb_item; ++z ){
579 Particle item = internal_items[items_to_create_local_id[z]];
582 item.mutableItemBase().setOwner(m_rank,m_rank);
584 if (!item_group.null())
585 item_group.addItems(items_to_create_local_id,
false);
586 if (new_particle_local_ids)
587 new_particle_local_ids->addRange(items_to_create_local_id);
588 for( VariableCollection::Enumerator i_var(m_variables_to_exchange); ++i_var; ){
589 IVariable* var = *i_var;
590 var->serialize(sbuf,items_to_create_local_id);
598void BasicParticleExchanger::
601 if (!m_waiting_messages.empty())
609void BasicParticleExchanger::
610addNewParticles(
Integer nb_particle)
612 ARCANE_UNUSED(nb_particle);
619void BasicParticleExchanger::
630ARCANE_REGISTER_SERVICE_BASICPARTICLEEXCHANGER(BasicParticleExchanger,BasicParticleExchanger);
633 BasicParticleExchanger);
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
#define ARCANE_REGISTER_SUB_DOMAIN_FACTORY(aclass, ainterface, aname)
Enregistre un service de fabrique pour la classe aclass.
Integer size() const
Nombre d'éléments du vecteur.
CaseOptionsBasicParticleExchanger * options() const
Options du jeu de données du service.
void clear()
Supprime les éléments du tableau.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
Interface d'une famille d'entités.
virtual String name() const =0
Nom de la famille.
virtual IMesh * mesh() const =0
Maillage associé
virtual IParallelMng * parallelMng()=0
Gestionnaire de parallèlisme.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual void computeMinMaxSum(char val, char &min_val, char &max_val, char &sum_val, Int32 &min_rank, Int32 &max_rank)=0
Calcule en une opération la somme, le min, le max d'une valeur.
virtual Int32 commRank() const =0
Rang de cette instance dans le communicateur.
virtual Ref< ISerializeMessageList > createSerializeMessageListRef()=0
Créé une liste pour gérer les 'ISerializeMessage'.
virtual char reduce(eReduceType rt, char v)=0
Effectue la réduction de type rt sur le réel v et retourne la valeur.
Interface d'un échangeur de particules.
Groupe d'entités de maillage.
void clear()
Supprime les entités du groupe.
bool null() const
true is le groupe est le groupe nul
Interface d'un message de sérialisation entre IMessagePassingMng.
Exception lorsqu'une fonction n'est pas implémentée.
Message utilisant un SerializeBuffer.
Chaîne de caractères unicode.
Positionne la phase de l'action en cours d'exécution.
Sentinelle pour le timer. La sentinelle associée à un timer permet de déclancher celui-ci au moment d...
@ TimerReal
Timer utilisant le temps réel.
TraceMessage info() const
Flot pour un message d'information.
Vecteur 1D de données avec sémantique par valeur (style STL).
VariableList m_variables_to_exchange
Liste des variables à échanger.
Int32 m_max_nb_message_without_reduce
void sendItems(Integer nb_particle_finish_exchange, Int32ConstArrayView local_ids, Int32ConstArrayView sub_domains_to_send) override
@ ReduceSum
Somme des valeurs.
UniqueArray< Int64 > Int64UniqueArray
Tableau dynamique à une dimension d'entiers 64 bits.
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
UniqueArray< Int32 > Int32UniqueArray
Tableau dynamique à une dimension d'entiers 32 bits.
Array< Int32 > Int32Array
Tableau dynamique à une dimension d'entiers 32 bits.
std::int32_t Int32
Type entier signé sur 32 bits.