14#include "arcane/mesh/BasicParticleExchanger.h"
16#include "arcane/core/internal/SerializeMessage.h"
38BasicParticleExchanger::
39BasicParticleExchanger(
const ServiceBuildInfo& sbi)
40: ArcaneBasicParticleExchangerObject(sbi)
47BasicParticleExchanger::
48~BasicParticleExchanger()
51 if (!m_pending_messages.empty() || !m_waiting_messages.empty())
52 pwarning() << String::format(
"Pending or waiting messages nb_pending={0} nb_waiting={1}",
53 m_pending_messages.size(),m_waiting_messages.size());
60void BasicParticleExchanger::
65 m_item_family = item_family;
72 m_support_shmem_variables =
options()->supportShmemVariables();
75 info() <<
"Initialize BasicParticleExchanger family=" << item_family->
name();
77 info() <<
"-- SupportShmemVariables = " << m_support_shmem_variables;
83void BasicParticleExchanger::
86 for(
auto msg : m_accumulate_infos )
88 m_accumulate_infos.clear();
94void BasicParticleExchanger::
95beginNewExchange(
Integer i_nb_particle)
98 String function_id =
"BasicParticleExchanger::beginNewExchange >>> ";
102 m_debug_exchange_items_level =
options()->debugExchangeItemsLevel();
107 m_last_nb_to_exchange = 0;
108 m_exchange_finished =
false;
109 m_print_info =
false;
110 m_current_nb_reduce = 0;
111 m_nb_particle_send = 0;
114 Int64 nb_particle = i_nb_particle;
115 Int64 min_nb_particle = 0;
116 Int64 max_nb_particle = 0;
117 Int64 nb_total_particle = 0;
121 nb_total_particle,min_rank,max_rank);
123 if (m_verbose_level>=1)
124 info() << function_id <<
"** NB PARTICLES IN VOL total=" << nb_total_particle
125 <<
" min=" << min_nb_particle <<
" max=" << max_nb_particle
126 <<
" min_rank=" << min_rank <<
" max_rank=" << max_rank
129 m_nb_total_particle_finish_exchange = 0;
143void BasicParticleExchanger::
144sendItems(
Integer nb_particle_finish_exchange,
148 ARCANE_UNUSED(nb_particle_finish_exchange);
150 m_nb_particle_send = local_ids.
size();
153 _generateSendItems(local_ids,sub_domains_to_send);
155 info(5) << A_FUNCINFO <<
"sendItems " <<
m_timer->lastActivationTime();
157 _sendPendingMessages();
163bool BasicParticleExchanger::
164exchangeItems(
Integer nb_particle_finish_exchange,
171 sendItems(nb_particle_finish_exchange,local_ids,sub_domains_to_send);
173 if (!item_group.
null())
176 return _waitMessages(0,item_group,
nullptr,functor);
182bool BasicParticleExchanger::
183exchangeItems(
Integer nb_particle_finish_exchange,
190 sendItems(nb_particle_finish_exchange,local_ids,sub_domains_to_send);
192 if (new_particle_local_ids)
193 new_particle_local_ids->
clear();
195 return _waitMessages(0,
ItemGroup(),new_particle_local_ids,functor);
201void BasicParticleExchanger::
209 String func_name(
"BasicParticleExchanger::sendItems()");
212 mesh->cellFamily()->getCommunicatingSubDomains(communicating_sub_domains);
214 Integer nb_connected_sub_domain = communicating_sub_domains.
size();
219 m_accumulate_infos.clear();
220 m_accumulate_infos.resize(nb_connected_sub_domain);
221 for(
Integer i=0; i<nb_connected_sub_domain; ++i ){
222 m_accumulate_infos[i] =
new SerializeMessage(m_rank,communicating_sub_domains[i],
223 ISerializeMessage::MT_Send);
226 if (m_rank==0 && i==0){
227 warning() <<
" WRONG MESSAGE";
229 ISerializeMessage::MT_Recv);
230 m_pending_messages.add(sm);
235 _addItemsToSend(local_ids,sub_domains_to_send,communicating_sub_domains,ids_to_send);
237 if (m_debug_exchange_items_level>=1){
238 info() <<
"-- Subdomain " << m_rank <<
". NB to send: " << local_ids.
size()
239 <<
" NB connected subdomain: " << nb_connected_sub_domain;
240 debug() <<
"NB connected subdomain for " << m_rank <<
" : " << m_accumulate_infos.size();
241 for( Integer i=0, s=m_accumulate_infos.size(); i<s; ++i ){
242 debug() <<
"NB for the subdomain " << m_accumulate_infos[i]->destRank()
243 <<
" " << ids_to_send[i].size();
250 for( Integer j=0; j<nb_connected_sub_domain; ++j ){
251 ISerializeMessage* sm = m_accumulate_infos[j];
255 _serializeMessage(sm,ids_to_send[j],items_to_send_uid,
256 items_to_send_cells_uid);
258 m_pending_messages.add(sm);
261 auto* recv_sm =
new SerializeMessage(m_rank,sm->destination().value(),
262 ISerializeMessage::MT_Recv);
263 m_pending_messages.add(recv_sm);
266 m_accumulate_infos.
clear();
268 m_item_family->toParticleFamily()->removeParticles(local_ids);
269 m_item_family->endUpdate();
275void BasicParticleExchanger::
276_addItemsToSend(Int32ConstArrayView local_ids,
277 Int32ConstArrayView sub_domains_to_send,
278 Int32ConstArrayView communicating_sub_domains,
279 UniqueArray< SharedArray<Int32> >& ids_to_send)
281 const Int32 debug_exchange_items_level = m_debug_exchange_items_level;
282 Int32 nb_connected_sub_domain = ids_to_send.size();
285 Int32 id_size = local_ids.size();
286 for( Integer i=0; i<id_size; ++i ){
287 Int32 item_local_id = local_ids[i];
288 Int32 sd_to_send = sub_domains_to_send[i];
289 if (sd_to_send==m_rank)
291 ARCANE_FATAL(
"The entity with local index {0} should not be sent to its own subdomain",
296 Integer sd_index = nb_connected_sub_domain;
297 for( Integer i_sd=0; i_sd<nb_connected_sub_domain; ++i_sd )
298 if (sd_to_send==communicating_sub_domains[i_sd]){
302 if (sd_index==nb_connected_sub_domain)
304 ids_to_send[sd_index].add(item_local_id);
305 if (debug_exchange_items_level>=1)
306 pinfo() <<
"ADD ITEM TO SEND lid=" << item_local_id <<
" sd_index=" << sd_index
307 <<
" sd=" << communicating_sub_domains[sd_index] <<
" n=" << ids_to_send[sd_index].size()
308 <<
" begin=" << ids_to_send[sd_index].data();
310 if (debug_exchange_items_level>=1)
311 for( Integer i=0; i<nb_connected_sub_domain; ++i )
312 pinfo() <<
"SEND INFO sd_index=" << i
313 <<
" sd=" << communicating_sub_domains[i] <<
" n=" << ids_to_send[i].size()
314 <<
" begin=" << ids_to_send[i].data();
316 if ((debug_exchange_items_level>=1 && m_print_info) || debug_exchange_items_level>=2){
317 IParallelMng* pm = m_parallel_mng;
318 Int32 rank = pm->commRank();
319 ParticleInfoListView items(m_item_family);
320 Integer nb_print = math::min(5,id_size);
321 for( Integer i=0; i<nb_print; ++i ){
322 Particle part(items[local_ids[i]]);
323 pinfo() <<
" RANK=" << rank <<
" LID=" << local_ids[i] <<
" SD=" << sub_domains_to_send[i]
324 <<
" uid=" << part.uniqueId() <<
" cell=" << ItemPrinter(part.cell());
333bool BasicParticleExchanger::
336 return _waitMessages(nb_pending_particle,
ItemGroup(),new_particle_local_ids,functor);
342bool BasicParticleExchanger::
346 String func_name =
"BasicParticleExchanger::waitMessages";
347 _waitMessages(item_group,new_particle_local_ids,functor);
349 bool do_reduce = m_current_nb_reduce>m_last_nb_reduce;
350 if (m_max_nb_message_without_reduce!=(-1))
351 do_reduce |= m_current_nb_reduce>m_max_nb_message_without_reduce;
354 Int64 nb_to_exchange = 0;
358 Int64 current_exchange = m_nb_particle_send+nb_pending_particle;
361 info(4) << func_name <<
"TimeReduce=" << m_timer->lastActivationTime()
362 <<
" nbtoexchange=" << nb_to_exchange;
363 m_exchange_finished = (nb_to_exchange==0);
364 if (nb_to_exchange>0 && m_last_nb_to_exchange==nb_to_exchange && m_nb_loop>300){
367 m_last_nb_to_exchange = nb_to_exchange;
370 ++m_current_nb_reduce;
371 debug() << func_name <<
" ** RETURN EXCHANGE m_exchange_finished: " << m_exchange_finished;
372 if (m_exchange_finished){
373 m_last_nb_reduce = m_current_nb_reduce - 4;
374 if (m_verbose_level>=1)
375 info() << func_name <<
" exchange finished "
376 <<
" n=" << m_current_nb_reduce
377 <<
" date=" << platform::getCurrentDateTime();
379 return m_exchange_finished;
385void BasicParticleExchanger::
386_waitMessages(ItemGroup item_group,Int32Array* new_particle_local_ids,IFunctor* functor)
390 Timer::Sentry ts(m_timer);
391 functor->executeFunctor();
393 m_total_time_functor += m_timer->lastActivationTime();
394 if (m_debug_exchange_items_level>=1)
395 info() <<
"TimeFunctor: current=" << m_timer->lastActivationTime()
396 <<
" total=" << m_total_time_functor;
400 Timer::Sentry ts(m_timer);
401 m_message_list->waitMessages(Parallel::WaitAll);
403 m_total_time_waiting += m_timer->lastActivationTime();
404 if (m_debug_exchange_items_level>=1)
405 info() <<
"TimeWaiting: current=" << m_timer->lastActivationTime()
406 <<
" total=" << m_total_time_waiting;
410 UniqueArray<ISerializeMessage*> current_messages(m_waiting_messages);
411 m_waiting_messages.clear();
413 Int32 nb_end_update = 0;
414 Int32 max_nb_messages = 0;
415 if (m_support_shmem_variables) {
416 max_nb_messages = m_parallel_mng->reduce(MessagePassing::ReduceMax, current_messages.size());
423 for (ISerializeMessage* sm : current_messages) {
425 _deserializeMessage(sm,items_to_create_unique_id,items_to_create_cells_unique_id,
426 items_to_create_local_id,items_to_create_cells_local_id,
427 item_group,new_particle_local_ids);
433 if (m_support_shmem_variables) {
434 for (; nb_end_update < max_nb_messages; ++nb_end_update) {
435 m_item_family->endUpdate();
439 if (!m_waiting_messages.empty())
440 ARCANE_FATAL(
"Pending messages n={0}",m_waiting_messages.size());
446void BasicParticleExchanger::
447_sendPendingMessages()
449 IParallelMng* pm = m_parallel_mng;
451 if (!m_message_list.get())
455 Timer::Sentry ts(m_timer);
457 Integer nb_message = m_pending_messages.size();
458 for( Integer i=0; i<nb_message; ++i ){
459 m_message_list->addMessage(m_pending_messages[i]);
460 m_waiting_messages.add(m_pending_messages[i]);
462 m_message_list->processPendingMessages();
463 m_pending_messages.clear();
470void BasicParticleExchanger::
471_serializeMessage(ISerializeMessage* sm,
472 Int32ConstArrayView acc_ids,
473 Int64Array& items_to_send_uid,
474 Int64Array& items_to_send_cells_uid)
476 ParticleInfoListView internal_items(m_item_family);
478 ISerializer* sbuf = sm->serializer();
479 sbuf->setMode(ISerializer::ModeReserve);
483 Integer nb_item = acc_ids.size();
487 sbuf->reserveInt64(1);
489 sbuf->reserveInt64(1);
490 sbuf->reserveSpan(eBasicDataType::Int64,nb_item);
493 sbuf->reserveSpan(eBasicDataType::Int64,nb_item);
495 for( VariableList::Enumerator i_var(m_variables_to_exchange); ++i_var; ){
496 IVariable* var = *i_var;
497 var->serialize(sbuf,acc_ids);
501 sbuf->allocateBuffer();
503 if (m_debug_exchange_items_level>=1)
504 info() <<
"BSE_SerializeMessage nb_item=" << nb_item
505 <<
" id=" << m_serialize_id
506 <<
" dest=" << sm->destination();
508 sbuf->setMode(ISerializer::ModePut);
510 sbuf->putInt64(m_serialize_id);
513 sbuf->putInt64(nb_item);
514 items_to_send_uid.resize(nb_item);
515 items_to_send_cells_uid.resize(nb_item);
516 for( Integer z=0; z<nb_item; ++z ){
517 Particle item = internal_items[acc_ids[z]];
518 items_to_send_uid[z] = item.uniqueId();
519 bool has_cell = item.hasCell();
520 items_to_send_cells_uid[z] = (has_cell) ? item.cell().uniqueId() : NULL_ITEM_UNIQUE_ID;
521 if (m_debug_exchange_items_level>=2){
522 info() <<
"Particle BufID=" << acc_ids[z]
523 <<
" LID=" << item.localId()
524 <<
" UID=" << items_to_send_uid[z]
525 <<
" CellIUID=" << items_to_send_cells_uid[z]
526 <<
" (owner=" << item.cell().owner() <<
")";
529 sbuf->putSpan(items_to_send_uid);
530 sbuf->putSpan(items_to_send_cells_uid);
532 for( VariableList::Enumerator i_var(m_variables_to_exchange); ++i_var; ){
533 IVariable* var = *i_var;
534 var->serialize(sbuf,acc_ids);
542void BasicParticleExchanger::
543_deserializeMessage(ISerializeMessage* message,
544 Int64Array& items_to_create_unique_id,
545 Int64Array& items_to_create_cells_unique_id,
546 Int32Array& items_to_create_local_id,
547 Int32Array& items_to_create_cells_local_id,
548 ItemGroup item_group,
549 Int32Array* new_particle_local_ids)
552 IMesh* mesh = m_item_family->mesh();
553 ISerializer* sbuf = message->serializer();
554 IItemFamily* cell_family = mesh->cellFamily();
557 sbuf->setMode(ISerializer::ModeGet);
558 sbuf->setReadMode(ISerializer::ReadReplace);
561 Int64 serialize_id = sbuf->getInt64();
562 Int64 nb_item = sbuf->getInt64();
563 if (m_debug_exchange_items_level>=1)
564 info() <<
"BSE_DeserializeMessage id=" << serialize_id <<
" nb=" << nb_item
565 <<
" orig=" << message->destination();
567 items_to_create_local_id.resize(nb_item);
568 items_to_create_unique_id.resize(nb_item);
569 items_to_create_cells_unique_id.resize(nb_item);
570 items_to_create_cells_local_id.resize(nb_item);
571 sbuf->getSpan(items_to_create_unique_id);
572 sbuf->getSpan(items_to_create_cells_unique_id);
573 if (m_debug_exchange_items_level>=2){
575 for( Integer z=0; z<nb_item; ++z ){
576 info() <<
"Particle UID=" << items_to_create_unique_id[z]
577 <<
" CellIUID=" << items_to_create_cells_unique_id[z];
581 items_to_create_cells_local_id.resize(nb_item);
582 cell_family->itemsUniqueIdToLocalId(items_to_create_cells_local_id,items_to_create_cells_unique_id);
584 m_item_family->toParticleFamily()->addParticles(items_to_create_unique_id,
585 items_to_create_cells_local_id,
586 items_to_create_local_id);
590 m_item_family->endUpdate();
594 ParticleInfoListView internal_items(m_item_family);
596 for( Integer z=0; z<nb_item; ++z ){
597 Particle item = internal_items[items_to_create_local_id[z]];
600 item.mutableItemBase().setOwner(m_rank,m_rank);
602 if (!item_group.null())
603 item_group.addItems(items_to_create_local_id,
false);
604 if (new_particle_local_ids)
605 new_particle_local_ids->addRange(items_to_create_local_id);
606 for( VariableCollection::Enumerator i_var(m_variables_to_exchange); ++i_var; ){
607 IVariable* var = *i_var;
608 var->serialize(sbuf,items_to_create_local_id);
616void BasicParticleExchanger::
619 if (!m_waiting_messages.empty())
627void BasicParticleExchanger::
628addNewParticles(
Integer nb_particle)
630 ARCANE_UNUSED(nb_particle);
637void BasicParticleExchanger::
648ARCANE_REGISTER_SERVICE_BASICPARTICLEEXCHANGER(BasicParticleExchanger,BasicParticleExchanger);
651 BasicParticleExchanger);
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
#define ARCANE_REGISTER_SUB_DOMAIN_FACTORY(aclass, ainterface, aname)
Enregistre un service de fabrique pour la classe aclass.
Integer size() const
Nombre d'éléments du vecteur.
CaseOptionsBasicParticleExchanger * options() const
Options du jeu de données du service.
void clear()
Supprime les éléments du tableau.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
Interface d'une famille d'entités.
virtual String name() const =0
Nom de la famille.
virtual IMesh * mesh() const =0
Maillage associé
virtual IParallelMng * parallelMng()=0
Gestionnaire de parallèlisme.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual void computeMinMaxSum(char val, char &min_val, char &max_val, char &sum_val, Int32 &min_rank, Int32 &max_rank)=0
Calcule en une opération la somme, le min, le max d'une valeur.
virtual Int32 commRank() const =0
Rang de cette instance dans le communicateur.
virtual Ref< ISerializeMessageList > createSerializeMessageListRef()=0
Créé une liste pour gérer les 'ISerializeMessage'.
virtual char reduce(eReduceType rt, char v)=0
Effectue la réduction de type rt sur le réel v et retourne la valeur.
Interface d'un échangeur de particules.
Groupe d'entités de maillage.
void clear()
Supprime les entités du groupe.
bool null() const
true is le groupe est le groupe nul
Interface d'un message de sérialisation entre IMessagePassingMng.
Exception lorsqu'une fonction n'est pas implémentée.
Message utilisant un SerializeBuffer.
Chaîne de caractères unicode.
Positionne la phase de l'action en cours d'exécution.
Sentinelle pour le timer. La sentinelle associée à un timer permet de déclancher celui-ci au moment d...
@ TimerReal
Timer utilisant le temps réel.
TraceMessage info() const
Flot pour un message d'information.
Vecteur 1D de données avec sémantique par valeur (style STL).
VariableList m_variables_to_exchange
Liste des variables à échanger.
Int32 m_max_nb_message_without_reduce
void sendItems(Integer nb_particle_finish_exchange, Int32ConstArrayView local_ids, Int32ConstArrayView sub_domains_to_send) override
@ ReduceSum
Somme des valeurs.
UniqueArray< Int64 > Int64UniqueArray
Tableau dynamique à une dimension d'entiers 64 bits.
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
UniqueArray< Int32 > Int32UniqueArray
Tableau dynamique à une dimension d'entiers 32 bits.
Array< Int32 > Int32Array
Tableau dynamique à une dimension d'entiers 32 bits.
std::int32_t Int32
Type entier signé sur 32 bits.