14#include "arcane/mesh/BasicParticleExchanger.h"
36BasicParticleExchanger::
37BasicParticleExchanger(
const ServiceBuildInfo& sbi)
38: ArcaneBasicParticleExchangerObject(sbi)
45BasicParticleExchanger::
46~BasicParticleExchanger()
49 if (!m_pending_messages.empty() || !m_waiting_messages.empty())
50 pwarning() << String::format(
"Pending or waiting messages nb_pending={0} nb_waiting={1}",
51 m_pending_messages.size(),m_waiting_messages.size());
58void BasicParticleExchanger::
64 m_parallel_mng =
item_family->mesh()->parallelMng();
65 m_rank = m_parallel_mng->commRank();
66 m_timer =
new Timer(m_parallel_mng->timerMng(),
"BasicParticleExchanger",Timer::TimerReal);
69 m_max_nb_message_without_reduce = options()->maxNbMessageWithoutReduce();
71 info() <<
"Initialize BasicParticleExchanger family=" <<
item_family->name();
72 info() <<
"-- MaxNbMessageWithoutReduce = " << m_max_nb_message_without_reduce;
78void BasicParticleExchanger::
81 for(
auto msg : m_accumulate_infos )
83 m_accumulate_infos.clear();
89void BasicParticleExchanger::
97 m_debug_exchange_items_level = options()->debugExchangeItemsLevel();
98 m_max_nb_message_without_reduce = options()->maxNbMessageWithoutReduce();
102 m_last_nb_to_exchange = 0;
103 m_exchange_finished =
false;
104 m_print_info =
false;
105 m_current_nb_reduce = 0;
106 m_nb_particle_send = 0;
118 if (m_verbose_level>=1)
122 <<
" date=" << platform::getCurrentDateTime();
124 m_nb_total_particle_finish_exchange = 0;
130 m_variables_to_exchange.clear();
131 m_item_family->usedVariables(m_variables_to_exchange);
132 m_variables_to_exchange.sortByName(
true);
138void BasicParticleExchanger::
150 info(5) << A_FUNCINFO <<
"sendItems " << m_timer->lastActivationTime();
152 _sendPendingMessages();
158bool BasicParticleExchanger::
171 return _waitMessages(0,
item_group,
nullptr,functor);
177bool BasicParticleExchanger::
196void BasicParticleExchanger::
202 IMesh* mesh = m_item_family->mesh();
204 String func_name(
"BasicParticleExchanger::sendItems()");
214 m_accumulate_infos.clear();
218 ISerializeMessage::MT_Send);
221 if (m_rank==0 && i==0){
222 warning() <<
" WRONG MESSAGE";
224 ISerializeMessage::MT_Recv);
225 m_pending_messages.add(
sm);
230 _addItemsToSend(local_ids,sub_domains_to_send,communicating_sub_domains,ids_to_send);
232 if (m_debug_exchange_items_level>=1){
233 info() <<
"-- Subdomain " << m_rank <<
". NB to send: " << local_ids.
size()
234 <<
" NB connected subdomain: " << nb_connected_sub_domain;
235 debug() <<
"NB connected subdomain for " << m_rank <<
" : " << m_accumulate_infos.size();
236 for( Integer i=0, s=m_accumulate_infos.size(); i<s; ++i ){
237 debug() <<
"NB for the subdomain " << m_accumulate_infos[i]->destRank()
238 <<
" " << ids_to_send[i].size();
245 for( Integer j=0; j<nb_connected_sub_domain; ++j ){
246 ISerializeMessage* sm = m_accumulate_infos[j];
250 _serializeMessage(sm,ids_to_send[j],items_to_send_uid,
251 items_to_send_cells_uid);
253 m_pending_messages.add(sm);
256 auto* recv_sm =
new SerializeMessage(m_rank,sm->destination().value(),
257 ISerializeMessage::MT_Recv);
258 m_pending_messages.add(recv_sm);
261 m_accumulate_infos.clear();
263 m_item_family->toParticleFamily()->removeParticles(local_ids);
264 m_item_family->endUpdate();
270void BasicParticleExchanger::
271_addItemsToSend(Int32ConstArrayView local_ids,
272 Int32ConstArrayView sub_domains_to_send,
273 Int32ConstArrayView communicating_sub_domains,
274 UniqueArray< SharedArray<Int32> >& ids_to_send)
276 const Int32 debug_exchange_items_level = m_debug_exchange_items_level;
277 Int32 nb_connected_sub_domain = ids_to_send.size();
280 Int32 id_size = local_ids.size();
281 for( Integer i=0; i<id_size; ++i ){
282 Int32 item_local_id = local_ids[i];
283 Int32 sd_to_send = sub_domains_to_send[i];
284 if (sd_to_send==m_rank)
286 ARCANE_FATAL(
"The entity with local index {0} should not be sent to its own subdomain",
291 Integer sd_index = nb_connected_sub_domain;
292 for( Integer i_sd=0; i_sd<nb_connected_sub_domain; ++i_sd )
293 if (sd_to_send==communicating_sub_domains[i_sd]){
297 if (sd_index==nb_connected_sub_domain)
299 ids_to_send[sd_index].add(item_local_id);
300 if (debug_exchange_items_level>=1)
301 pinfo() <<
"ADD ITEM TO SEND lid=" << item_local_id <<
" sd_index=" << sd_index
302 <<
" sd=" << communicating_sub_domains[sd_index] <<
" n=" << ids_to_send[sd_index].size()
303 <<
" begin=" << ids_to_send[sd_index].data();
305 if (debug_exchange_items_level>=1)
306 for( Integer i=0; i<nb_connected_sub_domain; ++i )
307 pinfo() <<
"SEND INFO sd_index=" << i
308 <<
" sd=" << communicating_sub_domains[i] <<
" n=" << ids_to_send[i].size()
309 <<
" begin=" << ids_to_send[i].data();
311 if ((debug_exchange_items_level>=1 && m_print_info) || debug_exchange_items_level>=2){
312 IParallelMng* pm = m_parallel_mng;
314 ParticleInfoListView items(m_item_family);
315 Integer nb_print = math::min(5,id_size);
316 for( Integer i=0; i<nb_print; ++i ){
317 Particle part(items[local_ids[i]]);
318 pinfo() <<
" RANK=" << rank <<
" LID=" << local_ids[i] <<
" SD=" << sub_domains_to_send[i]
319 <<
" uid=" << part.uniqueId() <<
" cell=" << ItemPrinter(part.cell());
328bool BasicParticleExchanger::
337bool BasicParticleExchanger::
341 String func_name =
"BasicParticleExchanger::waitMessages";
344 bool do_reduce = m_current_nb_reduce>m_last_nb_reduce;
345 if (m_max_nb_message_without_reduce!=(-1))
346 do_reduce |= m_current_nb_reduce>m_max_nb_message_without_reduce;
356 info(4) << func_name <<
"TimeReduce=" << m_timer->lastActivationTime()
362 m_last_nb_to_exchange = nb_to_exchange;
365 ++m_current_nb_reduce;
366 debug() << func_name <<
" ** RETURN EXCHANGE m_exchange_finished: " << m_exchange_finished;
367 if (m_exchange_finished){
368 m_last_nb_reduce = m_current_nb_reduce - 4;
369 if (m_verbose_level>=1)
370 info() << func_name <<
" exchange finished "
371 <<
" n=" << m_current_nb_reduce
372 <<
" date=" << platform::getCurrentDateTime();
374 return m_exchange_finished;
380void BasicParticleExchanger::
381_waitMessages(ItemGroup item_group,Int32Array* new_particle_local_ids,IFunctor* functor)
385 Timer::Sentry ts(m_timer);
386 functor->executeFunctor();
388 m_total_time_functor += m_timer->lastActivationTime();
389 if (m_debug_exchange_items_level>=1)
390 info() <<
"TimeFunctor: current=" << m_timer->lastActivationTime()
391 <<
" total=" << m_total_time_functor;
395 Timer::Sentry ts(m_timer);
396 m_message_list->waitMessages(Parallel::WaitAll);
398 m_total_time_waiting += m_timer->lastActivationTime();
399 if (m_debug_exchange_items_level>=1)
400 info() <<
"TimeWaiting: current=" << m_timer->lastActivationTime()
401 <<
" total=" << m_total_time_waiting;
405 UniqueArray<ISerializeMessage*> current_messages(m_waiting_messages);
406 m_waiting_messages.clear();
412 for( ISerializeMessage* sm : current_messages ){
414 _deserializeMessage(sm,items_to_create_unique_id,items_to_create_cells_unique_id,
415 items_to_create_local_id,items_to_create_cells_local_id,
416 item_group,new_particle_local_ids);
419 if (!m_waiting_messages.empty())
420 ARCANE_FATAL(
"Pending messages n={0}",m_waiting_messages.size());
426void BasicParticleExchanger::
427_sendPendingMessages()
429 IParallelMng* pm = m_parallel_mng;
431 if (!m_message_list.get())
435 Timer::Sentry ts(m_timer);
437 Integer nb_message = m_pending_messages.size();
438 for( Integer i=0; i<nb_message; ++i ){
439 m_message_list->addMessage(m_pending_messages[i]);
440 m_waiting_messages.add(m_pending_messages[i]);
442 m_message_list->processPendingMessages();
443 m_pending_messages.clear();
450void BasicParticleExchanger::
451_serializeMessage(ISerializeMessage* sm,
452 Int32ConstArrayView acc_ids,
453 Int64Array& items_to_send_uid,
454 Int64Array& items_to_send_cells_uid)
456 ParticleInfoListView internal_items(m_item_family);
458 ISerializer* sbuf = sm->serializer();
459 sbuf->setMode(ISerializer::ModeReserve);
463 Integer nb_item = acc_ids.size();
467 sbuf->reserve(DT_Int64,1);
469 sbuf->reserve(DT_Int64,1);
470 sbuf->reserveSpan(DT_Int64,nb_item);
473 sbuf->reserveSpan(DT_Int64,nb_item);
475 for( VariableList::Enumerator i_var(m_variables_to_exchange); ++i_var; ){
476 IVariable* var = *i_var;
477 var->serialize(sbuf,acc_ids);
481 sbuf->allocateBuffer();
483 if (m_debug_exchange_items_level>=1)
484 info() <<
"BSE_SerializeMessage nb_item=" << nb_item
485 <<
" id=" << m_serialize_id
486 <<
" dest=" << sm->destination();
488 sbuf->setMode(ISerializer::ModePut);
490 sbuf->putInt64(m_serialize_id);
493 sbuf->putInt64(nb_item);
494 items_to_send_uid.resize(nb_item);
495 items_to_send_cells_uid.resize(nb_item);
496 for( Integer z=0; z<nb_item; ++z ){
497 Particle item = internal_items[acc_ids[z]];
498 items_to_send_uid[z] = item.uniqueId();
499 bool has_cell = item.hasCell();
500 items_to_send_cells_uid[z] = (has_cell) ? item.cell().uniqueId() : NULL_ITEM_UNIQUE_ID;
501 if (m_debug_exchange_items_level>=2){
502 info() <<
"Particle BufID=" << acc_ids[z]
503 <<
" LID=" << item.localId()
504 <<
" UID=" << items_to_send_uid[z]
505 <<
" CellIUID=" << items_to_send_cells_uid[z]
506 <<
" (owner=" << item.cell().owner() <<
")";
509 sbuf->putSpan(items_to_send_uid);
510 sbuf->putSpan(items_to_send_cells_uid);
512 for( VariableList::Enumerator i_var(m_variables_to_exchange); ++i_var; ){
513 IVariable* var = *i_var;
514 var->serialize(sbuf,acc_ids);
522void BasicParticleExchanger::
523_deserializeMessage(ISerializeMessage* message,
524 Int64Array& items_to_create_unique_id,
525 Int64Array& items_to_create_cells_unique_id,
526 Int32Array& items_to_create_local_id,
527 Int32Array& items_to_create_cells_local_id,
528 ItemGroup item_group,
529 Int32Array* new_particle_local_ids)
532 IMesh* mesh = m_item_family->mesh();
533 ISerializer* sbuf = message->serializer();
534 IItemFamily* cell_family = mesh->
cellFamily();
537 sbuf->setMode(ISerializer::ModeGet);
538 sbuf->setReadMode(ISerializer::ReadReplace);
541 Int64 serialize_id = sbuf->getInt64();
542 Int64 nb_item = sbuf->getInt64();
543 if (m_debug_exchange_items_level>=1)
544 info() <<
"BSE_DeserializeMessage id=" << serialize_id <<
" nb=" << nb_item
545 <<
" orig=" << message->destination();
547 items_to_create_local_id.resize(nb_item);
548 items_to_create_unique_id.resize(nb_item);
549 items_to_create_cells_unique_id.resize(nb_item);
550 items_to_create_cells_local_id.resize(nb_item);
551 sbuf->getSpan(items_to_create_unique_id);
552 sbuf->getSpan(items_to_create_cells_unique_id);
553 if (m_debug_exchange_items_level>=2){
555 for( Integer z=0; z<nb_item; ++z ){
556 info() <<
"Particle UID=" << items_to_create_unique_id[z]
557 <<
" CellIUID=" << items_to_create_cells_unique_id[z];
561 items_to_create_cells_local_id.resize(nb_item);
562 cell_family->itemsUniqueIdToLocalId(items_to_create_cells_local_id,items_to_create_cells_unique_id);
564 m_item_family->toParticleFamily()->addParticles(items_to_create_unique_id,
565 items_to_create_cells_local_id,
566 items_to_create_local_id);
570 m_item_family->endUpdate();
574 ParticleInfoListView internal_items(m_item_family);
576 for( Integer z=0; z<nb_item; ++z ){
577 Particle item = internal_items[items_to_create_local_id[z]];
580 item.mutableItemBase().setOwner(m_rank,m_rank);
582 if (!item_group.null())
583 item_group.addItems(items_to_create_local_id,
false);
584 if (new_particle_local_ids)
585 new_particle_local_ids->addRange(items_to_create_local_id);
586 for( VariableCollection::Enumerator i_var(m_variables_to_exchange); ++i_var; ){
587 IVariable* var = *i_var;
588 var->serialize(sbuf,items_to_create_local_id);
596void BasicParticleExchanger::
599 if (!m_waiting_messages.empty())
607void BasicParticleExchanger::
617void BasicParticleExchanger::
628ARCANE_REGISTER_SERVICE_BASICPARTICLEEXCHANGER(BasicParticleExchanger,BasicParticleExchanger);
631 BasicParticleExchanger);
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
#define ARCANE_REGISTER_SUB_DOMAIN_FACTORY(aclass, ainterface, aname)
Enregistre un service de fabrique pour la classe aclass.
Tableau d'items de types quelconques.
Interface d'une famille d'entités.
virtual IItemFamily * cellFamily()=0
Retourne la famille des mailles.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual void computeMinMaxSum(char val, char &min_val, char &max_val, char &sum_val, Int32 &min_rank, Int32 &max_rank)=0
Calcule en une opération la somme, le min, le max d'une valeur.
virtual Int32 commRank() const =0
Rang de cette instance dans le communicateur.
virtual Ref< ISerializeMessageList > createSerializeMessageListRef()=0
Créé une liste pour gérer les 'ISerializeMessage'.
virtual char reduce(eReduceType rt, char v)=0
Effectue la réduction de type rt sur le réel v et retourne la valeur.
virtual void barrier()=0
Effectue une barière.
Interface d'un échangeur de particules.
Groupe d'entités de maillage.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Message utilisant un SerializeBuffer.
Positionne la phase de l'action en cours d'exécution.
Sentinelle pour le timer. La sentinelle associée à un timer permet de déclancher celui-ci au moment d...
Vue constante d'un tableau de type T.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
Interface d'un message de sérialisation entre IMessagePassingMng.
Exception lorsqu'une fonction n'est pas implémentée.
Chaîne de caractères unicode.
Vecteur 1D de données avec sémantique par valeur (style STL).
UniqueArray< Int64 > Int64UniqueArray
Tableau dynamique à une dimension d'entiers 64 bits.
UniqueArray< Int32 > Int32UniqueArray
Tableau dynamique à une dimension d'entiers 32 bits.
Int32 Integer
Type représentant un entier.