14#include "arcane/mesh/BasicParticleExchanger.h"
16#include "arcane/core/internal/SerializeMessage.h"
37BasicParticleExchanger::
38BasicParticleExchanger(
const ServiceBuildInfo& sbi)
39: ArcaneBasicParticleExchangerObject(sbi)
46BasicParticleExchanger::
47~BasicParticleExchanger()
50 if (!m_pending_messages.empty() || !m_waiting_messages.empty())
51 pwarning() << String::format(
"Pending or waiting messages nb_pending={0} nb_waiting={1}",
52 m_pending_messages.size(), m_waiting_messages.size());
59void BasicParticleExchanger::
64 m_item_family = item_family;
71 m_support_shmem_variables =
options()->supportShmemVariables();
74 info() <<
"Initialize BasicParticleExchanger family=" << item_family->
name();
76 info() <<
"-- SupportShmemVariables = " << m_support_shmem_variables;
82void BasicParticleExchanger::
85 for (
auto msg : m_accumulate_infos)
87 m_accumulate_infos.clear();
93void BasicParticleExchanger::
94beginNewExchange(
Integer i_nb_particle)
97 String function_id =
"BasicParticleExchanger::beginNewExchange >>> ";
101 m_debug_exchange_items_level =
options()->debugExchangeItemsLevel();
106 m_last_nb_to_exchange = 0;
107 m_exchange_finished =
false;
108 m_print_info =
false;
109 m_current_nb_reduce = 0;
110 m_nb_particle_send = 0;
113 Int64 nb_particle = i_nb_particle;
114 Int64 min_nb_particle = 0;
115 Int64 max_nb_particle = 0;
116 Int64 nb_total_particle = 0;
120 nb_total_particle, min_rank, max_rank);
122 if (m_verbose_level >= 1)
123 info() << function_id <<
"** NB PARTICLES IN VOL total=" << nb_total_particle
124 <<
" min=" << min_nb_particle <<
" max=" << max_nb_particle
125 <<
" min_rank=" << min_rank <<
" max_rank=" << max_rank
128 m_nb_total_particle_finish_exchange = 0;
142void BasicParticleExchanger::
143sendItems(
Integer nb_particle_finish_exchange,
147 ARCANE_UNUSED(nb_particle_finish_exchange);
149 m_nb_particle_send = local_ids.
size();
152 _generateSendItems(local_ids, sub_domains_to_send);
154 info(5) << A_FUNCINFO <<
"sendItems " <<
m_timer->lastActivationTime();
156 _sendPendingMessages();
162bool BasicParticleExchanger::
163exchangeItems(
Integer nb_particle_finish_exchange,
170 sendItems(nb_particle_finish_exchange, local_ids, sub_domains_to_send);
172 if (!item_group.
null())
175 return _waitMessages(0, item_group,
nullptr, functor);
181bool BasicParticleExchanger::
182exchangeItems(
Integer nb_particle_finish_exchange,
189 sendItems(nb_particle_finish_exchange, local_ids, sub_domains_to_send);
191 if (new_particle_local_ids)
192 new_particle_local_ids->
clear();
194 return _waitMessages(0,
ItemGroup(), new_particle_local_ids, functor);
200void BasicParticleExchanger::
208 String func_name(
"BasicParticleExchanger::sendItems()");
211 mesh->cellFamily()->getCommunicatingSubDomains(communicating_sub_domains);
213 Integer nb_connected_sub_domain = communicating_sub_domains.
size();
218 m_accumulate_infos.clear();
219 m_accumulate_infos.resize(nb_connected_sub_domain);
220 for (
Integer i = 0; i < nb_connected_sub_domain; ++i) {
221 m_accumulate_infos[i] =
new SerializeMessage(m_rank, communicating_sub_domains[i],
222 ISerializeMessage::MT_Send);
225 if (m_rank==0 && i==0){
226 warning() <<
" WRONG MESSAGE";
228 ISerializeMessage::MT_Recv);
229 m_pending_messages.add(sm);
234 _addItemsToSend(local_ids, sub_domains_to_send, communicating_sub_domains, ids_to_send);
236 if (m_debug_exchange_items_level >= 1) {
237 info() <<
"-- Subdomain " << m_rank <<
". NB to send: " << local_ids.
size()
238 <<
" NB connected subdomain: " << nb_connected_sub_domain;
239 debug() <<
"NB connected subdomain for " << m_rank <<
" : " << m_accumulate_infos.size();
240 for (Integer i = 0, s = m_accumulate_infos.size(); i < s; ++i) {
241 debug() <<
"NB for the subdomain " << m_accumulate_infos[i]->destRank()
242 <<
" " << ids_to_send[i].size();
249 for (Integer j = 0; j < nb_connected_sub_domain; ++j) {
250 ISerializeMessage* sm = m_accumulate_infos[j];
254 _serializeMessage(sm, ids_to_send[j], items_to_send_uid,
255 items_to_send_cells_uid);
257 m_pending_messages.add(sm);
260 auto* recv_sm =
new SerializeMessage(m_rank, sm->destination().value(),
261 ISerializeMessage::MT_Recv);
262 m_pending_messages.add(recv_sm);
265 m_accumulate_infos.
clear();
267 m_item_family->toParticleFamily()->removeParticles(local_ids);
268 m_item_family->endUpdate();
274void BasicParticleExchanger::
275_addItemsToSend(Int32ConstArrayView local_ids,
276 Int32ConstArrayView sub_domains_to_send,
277 Int32ConstArrayView communicating_sub_domains,
278 UniqueArray<SharedArray<Int32>>& ids_to_send)
280 const Int32 debug_exchange_items_level = m_debug_exchange_items_level;
281 Int32 nb_connected_sub_domain = ids_to_send.size();
284 Int32 id_size = local_ids.size();
285 for (Integer i = 0; i < id_size; ++i) {
286 Int32 item_local_id = local_ids[i];
287 Int32 sd_to_send = sub_domains_to_send[i];
288 if (sd_to_send == m_rank)
290 ARCANE_FATAL(
"The entity with local index {0} should not be sent to its own subdomain",
295 Integer sd_index = nb_connected_sub_domain;
296 for (Integer i_sd = 0; i_sd < nb_connected_sub_domain; ++i_sd)
297 if (sd_to_send == communicating_sub_domains[i_sd]) {
301 if (sd_index == nb_connected_sub_domain)
303 ids_to_send[sd_index].add(item_local_id);
304 if (debug_exchange_items_level >= 1)
305 pinfo() <<
"ADD ITEM TO SEND lid=" << item_local_id <<
" sd_index=" << sd_index
306 <<
" sd=" << communicating_sub_domains[sd_index] <<
" n=" << ids_to_send[sd_index].size()
307 <<
" begin=" << ids_to_send[sd_index].data();
309 if (debug_exchange_items_level >= 1)
310 for (Integer i = 0; i < nb_connected_sub_domain; ++i)
311 pinfo() <<
"SEND INFO sd_index=" << i
312 <<
" sd=" << communicating_sub_domains[i] <<
" n=" << ids_to_send[i].size()
313 <<
" begin=" << ids_to_send[i].data();
315 if ((debug_exchange_items_level >= 1 && m_print_info) || debug_exchange_items_level >= 2) {
316 IParallelMng* pm = m_parallel_mng;
317 Int32 rank = pm->commRank();
318 ParticleInfoListView items(m_item_family);
319 Integer nb_print = math::min(5, id_size);
320 for (Integer i = 0; i < nb_print; ++i) {
321 Particle part(items[local_ids[i]]);
322 pinfo() <<
" RANK=" << rank <<
" LID=" << local_ids[i] <<
" SD=" << sub_domains_to_send[i]
323 <<
" uid=" << part.uniqueId() <<
" cell=" << ItemPrinter(part.cell());
332bool BasicParticleExchanger::
335 return _waitMessages(nb_pending_particle,
ItemGroup(), new_particle_local_ids, functor);
341bool BasicParticleExchanger::
345 String func_name =
"BasicParticleExchanger::waitMessages";
346 _waitMessages(item_group, new_particle_local_ids, functor);
348 bool do_reduce = m_current_nb_reduce > m_last_nb_reduce;
349 if (m_max_nb_message_without_reduce != (-1))
350 do_reduce |= m_current_nb_reduce > m_max_nb_message_without_reduce;
353 Int64 nb_to_exchange = 0;
357 Int64 current_exchange = m_nb_particle_send + nb_pending_particle;
360 info(4) << func_name <<
"TimeReduce=" << m_timer->lastActivationTime()
361 <<
" nbtoexchange=" << nb_to_exchange;
362 m_exchange_finished = (nb_to_exchange == 0);
363 if (nb_to_exchange > 0 && m_last_nb_to_exchange == nb_to_exchange && m_nb_loop > 300) {
366 m_last_nb_to_exchange = nb_to_exchange;
369 ++m_current_nb_reduce;
370 debug() << func_name <<
" ** RETURN EXCHANGE m_exchange_finished: " << m_exchange_finished;
371 if (m_exchange_finished) {
372 m_last_nb_reduce = m_current_nb_reduce - 4;
373 if (m_verbose_level >= 1)
374 info() << func_name <<
" exchange finished "
375 <<
" n=" << m_current_nb_reduce
376 <<
" date=" << platform::getCurrentDateTime();
378 return m_exchange_finished;
384void BasicParticleExchanger::
385_waitMessages(ItemGroup item_group, Int32Array* new_particle_local_ids, IFunctor* functor)
389 Timer::Sentry ts(m_timer);
390 functor->executeFunctor();
392 m_total_time_functor += m_timer->lastActivationTime();
393 if (m_debug_exchange_items_level >= 1)
394 info() <<
"TimeFunctor: current=" << m_timer->lastActivationTime()
395 <<
" total=" << m_total_time_functor;
399 Timer::Sentry ts(m_timer);
400 m_message_list->waitMessages(Parallel::WaitAll);
402 m_total_time_waiting += m_timer->lastActivationTime();
403 if (m_debug_exchange_items_level >= 1)
404 info() <<
"TimeWaiting: current=" << m_timer->lastActivationTime()
405 <<
" total=" << m_total_time_waiting;
409 UniqueArray<ISerializeMessage*> current_messages(m_waiting_messages);
410 m_waiting_messages.clear();
412 Int32 nb_end_update = 0;
413 Int32 max_nb_messages = 0;
414 if (m_support_shmem_variables) {
415 max_nb_messages = m_parallel_mng->reduce(MessagePassing::ReduceMax, current_messages.size());
422 for (ISerializeMessage* sm : current_messages) {
424 _deserializeMessage(sm, items_to_create_unique_id, items_to_create_cells_unique_id,
425 items_to_create_local_id, items_to_create_cells_local_id,
426 item_group, new_particle_local_ids);
432 if (m_support_shmem_variables) {
433 for (; nb_end_update < max_nb_messages; ++nb_end_update) {
434 m_item_family->endUpdate();
438 if (!m_waiting_messages.empty())
439 ARCANE_FATAL(
"Pending messages n={0}", m_waiting_messages.size());
445void BasicParticleExchanger::
446_sendPendingMessages()
448 IParallelMng* pm = m_parallel_mng;
450 if (!m_message_list.get())
454 Timer::Sentry ts(m_timer);
456 Integer nb_message = m_pending_messages.size();
457 for (Integer i = 0; i < nb_message; ++i) {
458 m_message_list->addMessage(m_pending_messages[i]);
459 m_waiting_messages.add(m_pending_messages[i]);
461 m_message_list->processPendingMessages();
462 m_pending_messages.clear();
469void BasicParticleExchanger::
470_serializeMessage(ISerializeMessage* sm,
471 Int32ConstArrayView acc_ids,
472 Int64Array& items_to_send_uid,
473 Int64Array& items_to_send_cells_uid)
475 ParticleInfoListView internal_items(m_item_family);
477 ISerializer* sbuf = sm->serializer();
478 sbuf->setMode(ISerializer::ModeReserve);
482 Integer nb_item = acc_ids.size();
486 sbuf->reserveInt64(1);
488 sbuf->reserveInt64(1);
489 sbuf->reserveSpan(eBasicDataType::Int64, nb_item);
492 sbuf->reserveSpan(eBasicDataType::Int64, nb_item);
494 for (VariableList::Enumerator i_var(m_variables_to_exchange); ++i_var;) {
495 IVariable* var = *i_var;
496 var->serialize(sbuf, acc_ids);
500 sbuf->allocateBuffer();
502 if (m_debug_exchange_items_level >= 1)
503 info() <<
"BSE_SerializeMessage nb_item=" << nb_item
504 <<
" id=" << m_serialize_id
505 <<
" dest=" << sm->destination();
507 sbuf->setMode(ISerializer::ModePut);
509 sbuf->putInt64(m_serialize_id);
512 sbuf->putInt64(nb_item);
513 items_to_send_uid.resize(nb_item);
514 items_to_send_cells_uid.resize(nb_item);
515 for (Integer z = 0; z < nb_item; ++z) {
516 Particle item = internal_items[acc_ids[z]];
517 items_to_send_uid[z] = item.uniqueId();
518 bool has_cell = item.hasCell();
519 items_to_send_cells_uid[z] = (has_cell) ? item.cell().uniqueId() : NULL_ITEM_UNIQUE_ID;
520 if (m_debug_exchange_items_level >= 2) {
521 info() <<
"Particle BufID=" << acc_ids[z]
522 <<
" LID=" << item.localId()
523 <<
" UID=" << items_to_send_uid[z]
524 <<
" CellIUID=" << items_to_send_cells_uid[z]
525 <<
" (owner=" << item.cell().owner() <<
")";
528 sbuf->putSpan(items_to_send_uid);
529 sbuf->putSpan(items_to_send_cells_uid);
531 for (VariableList::Enumerator i_var(m_variables_to_exchange); ++i_var;) {
532 IVariable* var = *i_var;
533 var->serialize(sbuf, acc_ids);
540void BasicParticleExchanger::
541_deserializeMessage(ISerializeMessage* message,
542 Int64Array& items_to_create_unique_id,
543 Int64Array& items_to_create_cells_unique_id,
544 Int32Array& items_to_create_local_id,
545 Int32Array& items_to_create_cells_local_id,
546 ItemGroup item_group,
547 Int32Array* new_particle_local_ids)
550 IMesh* mesh = m_item_family->mesh();
551 ISerializer* sbuf = message->serializer();
552 IItemFamily* cell_family = mesh->
cellFamily();
555 sbuf->setMode(ISerializer::ModeGet);
556 sbuf->setReadMode(ISerializer::ReadReplace);
559 Int64 serialize_id = sbuf->getInt64();
560 Int64 nb_item = sbuf->getInt64();
561 if (m_debug_exchange_items_level >= 1)
562 info() <<
"BSE_DeserializeMessage id=" << serialize_id <<
" nb=" << nb_item
563 <<
" orig=" << message->destination();
565 items_to_create_local_id.resize(nb_item);
566 items_to_create_unique_id.resize(nb_item);
567 items_to_create_cells_unique_id.resize(nb_item);
568 items_to_create_cells_local_id.resize(nb_item);
569 sbuf->getSpan(items_to_create_unique_id);
570 sbuf->getSpan(items_to_create_cells_unique_id);
571 if (m_debug_exchange_items_level >= 2) {
573 for (Integer z = 0; z < nb_item; ++z) {
574 info() <<
"Particle UID=" << items_to_create_unique_id[z]
575 <<
" CellIUID=" << items_to_create_cells_unique_id[z];
579 items_to_create_cells_local_id.resize(nb_item);
580 cell_family->itemsUniqueIdToLocalId(items_to_create_cells_local_id, items_to_create_cells_unique_id);
582 m_item_family->toParticleFamily()->addParticles(items_to_create_unique_id,
583 items_to_create_cells_local_id,
584 items_to_create_local_id);
588 m_item_family->endUpdate();
592 ParticleInfoListView internal_items(m_item_family);
594 for (Integer z = 0; z < nb_item; ++z) {
595 Particle item = internal_items[items_to_create_local_id[z]];
598 item.mutableItemBase().setOwner(m_rank, m_rank);
600 if (!item_group.null())
601 item_group.addItems(items_to_create_local_id,
false);
602 if (new_particle_local_ids)
603 new_particle_local_ids->addRange(items_to_create_local_id);
604 for (VariableCollection::Enumerator i_var(m_variables_to_exchange); ++i_var;) {
605 IVariable* var = *i_var;
606 var->serialize(sbuf, items_to_create_local_id);
614void BasicParticleExchanger::
617 if (!m_waiting_messages.empty())
625void BasicParticleExchanger::
626addNewParticles(
Integer nb_particle)
628 ARCANE_UNUSED(nb_particle);
635void BasicParticleExchanger::
646ARCANE_REGISTER_SERVICE_BASICPARTICLEEXCHANGER(BasicParticleExchanger, BasicParticleExchanger);
649 BasicParticleExchanger);
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
#define ARCANE_REGISTER_SUB_DOMAIN_FACTORY(aclass, ainterface, aname)
Enregistre un service de fabrique pour la classe aclass.
Integer size() const
Nombre d'éléments du vecteur.
CaseOptionsBasicParticleExchanger * options() const
Options du jeu de données du service.
void clear()
Supprime les éléments du tableau.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
Interface d'une famille d'entités.
virtual String name() const =0
Nom de la famille.
virtual IMesh * mesh() const =0
Maillage associé
virtual IItemFamily * cellFamily()=0
Retourne la famille des mailles.
virtual IParallelMng * parallelMng()=0
Gestionnaire de parallèlisme.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual void computeMinMaxSum(char val, char &min_val, char &max_val, char &sum_val, Int32 &min_rank, Int32 &max_rank)=0
Calcule en une opération la somme, le min, le max d'une valeur.
virtual Int32 commRank() const =0
Rang de cette instance dans le communicateur.
virtual Ref< ISerializeMessageList > createSerializeMessageListRef()=0
Créé une liste pour gérer les 'ISerializeMessage'.
virtual char reduce(eReduceType rt, char v)=0
Effectue la réduction de type rt sur le réel v et retourne la valeur.
Interface d'un échangeur de particules.
Groupe d'entités de maillage.
void clear()
Supprime les entités du groupe.
bool null() const
true is le groupe est le groupe nul
Interface d'un message de sérialisation entre IMessagePassingMng.
Exception lorsqu'une fonction n'est pas implémentée.
Message utilisant un SerializeBuffer.
Chaîne de caractères unicode.
Positionne la phase de l'action en cours d'exécution.
Sentinelle pour le timer. La sentinelle associée à un timer permet de déclancher celui-ci au moment d...
@ TimerReal
Timer utilisant le temps réel.
TraceMessage info() const
Flot pour un message d'information.
Vecteur 1D de données avec sémantique par valeur (style STL).
VariableList m_variables_to_exchange
List of variables to exchange.
Int32 m_max_nb_message_without_reduce
void sendItems(Integer nb_particle_finish_exchange, Int32ConstArrayView local_ids, Int32ConstArrayView sub_domains_to_send) override
@ ReduceSum
Somme des valeurs.
UniqueArray< Int64 > Int64UniqueArray
Tableau dynamique à une dimension d'entiers 64 bits.
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
UniqueArray< Int32 > Int32UniqueArray
Tableau dynamique à une dimension d'entiers 32 bits.
Array< Int32 > Int32Array
Tableau dynamique à une dimension d'entiers 32 bits.
std::int32_t Int32
Type entier signé sur 32 bits.