14#include "arcane/mesh/NonBlockingParticleExchanger.h"
16#include "arcane/utils/List.h"
17#include "arcane/utils/FatalErrorException.h"
18#include "arcane/utils/IFunctor.h"
19#include "arcane/utils/PlatformUtils.h"
21#include "arcane/ItemGroup.h"
22#include "arcane/ItemVector.h"
23#include "arcane/IItemFamily.h"
24#include "arcane/IParticleFamily.h"
25#include "arcane/IParallelMng.h"
26#include "arcane/IVariableMng.h"
27#include "arcane/IVariable.h"
28#include "arcane/IMesh.h"
29#include "arcane/Item.h"
30#include "arcane/Timer.h"
31#include "arcane/SerializeMessage.h"
32#include "arcane/ISerializeMessageList.h"
33#include "arcane/CommonVariables.h"
34#include "arcane/Timer.h"
35#include "arcane/FactoryService.h"
48NonBlockingParticleExchanger::
49NonBlockingParticleExchanger(
const ServiceBuildInfo& sbi)
51, m_item_family(nullptr)
52, m_parallel_mng(sbi.mesh()->parallelMng())
53, m_rank(m_parallel_mng->commRank())
54, m_timer(new Timer(m_parallel_mng->timerMng(),
"NonBlockingParticleExchanger",Timer::TimerReal))
55, m_total_time_functor(0.)
56, m_total_time_waiting(0.)
57, m_nb_total_particle_finish_exchange(0)
58, m_nb_total_particle(0)
59, m_nb_original_blocking_size(0)
60, m_nb_blocking_size(m_nb_original_blocking_size)
61, m_exchange_finished(true)
63, m_need_general_receive(false)
64, m_end_message_sended(false)
65, m_can_process_messages(true)
66, m_can_process_non_blocking(false)
67, m_want_process_non_blocking(false)
68, m_want_fast_send_particles(true)
69, m_nb_receive_message(0)
70, m_nb_particle_finished_exchange(0)
77#if ARCANE_DEBUG_EXCHANGE_ITEMS
85NonBlockingParticleExchanger::
86~NonBlockingParticleExchanger()
90 if (!m_pending_messages.empty() || !m_waiting_messages.empty()){
91 String s = String::format(
"pending or waiting messages: nb_pending={0} nb_waiting=",
92 m_pending_messages.size(),m_waiting_messages.size());
96 if (!m_waiting_local_ids.empty()){
97 warning() << String::format(
"pending particles: nb_pending=",m_waiting_local_ids.size());
106void NonBlockingParticleExchanger::
115void NonBlockingParticleExchanger::
118 for( Integer i=0, is=m_accumulate_infos.size(); i<is; ++i ){
119 delete m_accumulate_infos[i];
120 m_accumulate_infos[i] = 0;
122 m_accumulate_infos.clear();
128void NonBlockingParticleExchanger::
134 m_end_message_sended =
false;
135 m_exchange_finished =
false;
136 m_nb_blocking_size = m_nb_original_blocking_size;
137 m_nb_receive_message = 0;
138 m_nb_particle_finished_exchange = 0;
143 info() <<
"BEGIN TRACKING TOTAL FLYING = " << m_nb_total_particle
145 <<
" (Date=" << platform::getCurrentDateTime() <<
")";
147 m_nb_total_particle_finish_exchange = 0;
149 m_need_general_receive =
true;
155 m_variables_to_exchange.clear();
156 m_item_family->usedVariables(m_variables_to_exchange);
157 m_variables_to_exchange.sortByName(
true);
163bool NonBlockingParticleExchanger::
177bool NonBlockingParticleExchanger::
192void NonBlockingParticleExchanger::
199 _sendPendingMessages();
205bool NonBlockingParticleExchanger::
214void NonBlockingParticleExchanger::
238bool NonBlockingParticleExchanger::
239_exchangeItems(Int32ConstArrayView local_ids,
240 Int32ConstArrayView sub_domains_to_send,ItemGroup item_group,
241 Int32Array* new_particle_local_ids,
244 if (m_want_fast_send_particles){
245 if (local_ids.empty())
246 _processFinishTrackingMessage();
249 if (!m_want_process_non_blocking)
250 _processFinishTrackingMessage();
251 if (m_exchange_finished){
252 _sendFinishExchangeParticle();
253 m_need_general_receive =
false;
255 _checkNeedReceiveMessage();
257 _checkSendItems(local_ids,sub_domains_to_send);
259 return _waitMessages(0,item_group,new_particle_local_ids,functor);
265bool NonBlockingParticleExchanger::
266_waitMessages(Integer nb_pending_particle,ItemGroup item_group,
267 Int32Array* new_particle_local_ids,IFunctor* functor)
269 ARCANE_UNUSED(nb_pending_particle);
271 if (!item_group.null())
274 m_can_process_messages =
true;
275 m_can_process_non_blocking = m_want_process_non_blocking;
276 while (m_can_process_messages && !m_exchange_finished){
277 m_can_process_messages =
false;
278 _processMessages(item_group,new_particle_local_ids,
false,functor);
281 if (m_exchange_finished){
282 info(5) <<
" ** EXCHANGE finished: ";
285 _processMessages(item_group,new_particle_local_ids,
true,0);
286 info(5) <<
" ** EXCHANGE finished END: ";
289 info(5) <<
" ** RETURN EXCHANGE m_exchange_finished: " << m_exchange_finished;
290 return m_exchange_finished;
296void NonBlockingParticleExchanger::
297_checkNeedReceiveMessage()
299 if (m_need_general_receive){
300 auto sm =
new SerializeMessage(m_rank,A_NULL_RANK,ISerializeMessage::MT_Recv);
301 m_pending_messages.add(sm);
302 m_need_general_receive =
false;
309void NonBlockingParticleExchanger::
310_generateSendItemsMessages(Int32ConstArrayView local_ids,
311 Int32ConstArrayView sub_domains_to_send)
313 Timer::Phase tphase(m_parallel_mng->timeStats(),TP_Communication);
315 IMesh* mesh = m_item_family->mesh();
318 mesh->cellFamily()->getCommunicatingSubDomains(communicating_sub_domains);
320 Integer nb_connected_sub_domain = communicating_sub_domains.size();
322 UniqueArray< SharedArray<Int32> > ids_to_send(nb_connected_sub_domain);
325 m_accumulate_infos.clear();
326 m_accumulate_infos.resize(nb_connected_sub_domain);
327 for( Integer i=0; i<nb_connected_sub_domain; ++i )
328 m_accumulate_infos[i] =
new SerializeMessage(m_rank,communicating_sub_domains[i],
329 ISerializeMessage::MT_Send);
331 _addItemsToSend(local_ids,sub_domains_to_send,communicating_sub_domains,ids_to_send);
332 _addItemsToSend(m_waiting_local_ids,m_waiting_sub_domains_to_send,
333 communicating_sub_domains,ids_to_send);
337 info() <<
"-- Subdomain " << m_rank <<
". NB to send: " << local_ids.size()
338 <<
" NB connected subdomains: " << nb_connected_sub_domain;
340 info() <<
"NB connected subdomain for " << m_rank <<
" : " << m_accumulate_infos.size();
341 for( Integer i=0, n=m_accumulate_infos.size(); i<n; ++i ){
342 info() <<
"------------- Send: rank=" << m_accumulate_infos[i]->destRank()
343 <<
" n=" << ids_to_send[i].size();
350 for( Integer j=0; j<nb_connected_sub_domain; ++j ){
351 ISerializeMessage* sm = m_accumulate_infos[j];
355 if (!ids_to_send[j].empty())
356 _serializeMessage(sm,ids_to_send[j],items_to_send_uid,
357 items_to_send_cells_uid);
363 m_accumulate_infos.clear();
366 info(5)<<
"NonBlockingParticleExchanger:: sendItems " <<
"local_ids "<<local_ids.size();
367 info(5)<<
"NonBlockingParticleExchanger:: sendItems " <<
"m_waiting_local_ids "<<m_waiting_local_ids.size();
369 m_item_family->toParticleFamily()->removeParticles(local_ids);
370 m_item_family->toParticleFamily()->removeParticles(m_waiting_local_ids);
371 m_item_family->endUpdate();
372 m_waiting_local_ids.clear();
373 m_waiting_sub_domains_to_send.clear();
379void NonBlockingParticleExchanger::
380_addItemsToSend(Int32ConstArrayView local_ids,
381 Int32ConstArrayView sub_domains_to_send,
382 Int32ConstArrayView communicating_sub_domains,
383 UniqueArray<SharedArray<Int32> >& ids_to_send)
385 String func_name(
"NonBlockingParticleExchanger::_addItemsToSend()");
386 Integer nb_connected_sub_domain = ids_to_send.size();
389 Integer id_size = local_ids.size();
390 for( Integer i=0; i<id_size; ++i ){
391 Int32 item_local_id = local_ids[i];
392 Integer sd_to_send = sub_domains_to_send[i];
394 if (sd_to_send==m_rank)
396 fatal() << func_name <<
"The entity with local id " << item_local_id
397 <<
" should not be sent to its own subdomain";
402 Integer sd_index = nb_connected_sub_domain;
403 for( Integer i_sd=0; i_sd<nb_connected_sub_domain; ++i_sd )
404 if (sd_to_send==communicating_sub_domains[i_sd]){
409 if (sd_index==nb_connected_sub_domain)
410 fatal() << func_name <<
"Internal: bad subdomain index";
412 ids_to_send[sd_index].add(item_local_id);
419void NonBlockingParticleExchanger::
420_processMessages(ItemGroup item_group,Int32Array* new_particle_local_ids,
421 bool wait_all,IFunctor* functor)
423 _sendPendingMessages();
427 Timer::Sentry ts(m_timer);
428 functor->executeFunctor();
430 m_total_time_functor += m_timer->lastActivationTime();
431 info(5) <<
"TimeFunctor: current=" << m_timer->lastActivationTime()
432 <<
" total=" << m_total_time_functor;
435 Integer nb_message_finished = 0;
437 Timer::Sentry ts(m_timer);
439 nb_message_finished = m_message_list->waitMessages(Parallel::WaitAll);
441 if (m_can_process_non_blocking)
442 nb_message_finished = m_message_list->waitMessages(Parallel::WaitSomeNonBlocking);
444 nb_message_finished = m_message_list->waitMessages(Parallel::WaitSome);
446 if (nb_message_finished==0){
447 m_can_process_non_blocking =
false;
448 m_can_process_messages =
true;
449 _processFinishTrackingMessage();
454 m_total_time_waiting += m_timer->lastActivationTime();
455 info(5) <<
"TimeWaiting: current=" << m_timer->lastActivationTime()
456 <<
" total=" << m_total_time_waiting;
460 UniqueArray<ISerializeMessage*> current_messages(m_waiting_messages);
461 m_waiting_messages.clear();
465 for( Integer i=0, is=current_messages.size(); i<is; ++i ){
466 ISerializeMessage* sm = current_messages[i];
469 _deserializeMessage(sm,items_to_create_id,items_to_create_cells_id,item_group,new_particle_local_ids);
470 ++m_nb_receive_message;
475 m_waiting_messages.add(sm);
482void NonBlockingParticleExchanger::
483_sendPendingMessages()
485 IParallelMng* pm = m_parallel_mng;
487 _checkNeedReceiveMessage();
489 if (!m_message_list.get())
493 Timer::Sentry ts(m_timer);
495 Integer nb_message = m_pending_messages.size();
496 for( Integer i=0; i<nb_message; ++i ){
497 m_message_list->addMessage(m_pending_messages[i]);
498 m_waiting_messages.add(m_pending_messages[i]);
500 m_message_list->processPendingMessages();
501 m_pending_messages.clear();
503 info(5) <<
"TimeSendMessages=" << m_timer->lastActivationTime()
504 <<
" buffersize=" << m_waiting_local_ids.size();
510void NonBlockingParticleExchanger::
511_serializeMessage(ISerializeMessage* sm,
512 Int32ConstArrayView acc_ids,
513 Int64Array& items_to_send_uid,
514 Int64Array& items_to_send_cells_uid)
516 ParticleInfoListView internal_items(m_item_family);
518 ISerializer* sbuf = sm->serializer();
519 sbuf->setMode(ISerializer::ModeReserve);
523 Integer nb_item = acc_ids.size();
525 sbuf->reserveInteger(1);
526 if (m_want_fast_send_particles){
528 sbuf->reserve(DT_Int64,1);
531 sbuf->reserve(DT_Int32,1);
533 sbuf->reserve(DT_Int64,1);
535 sbuf->reserveSpan(DT_Int64,nb_item);
537 sbuf->reserveSpan(DT_Int64,nb_item);
539 for( VariableList::Enumerator i_var(m_variables_to_exchange); ++i_var; ){
540 IVariable* var = *i_var;
541 var->serialize(sbuf,acc_ids);
545 sbuf->allocateBuffer();
546 sbuf->setMode(ISerializer::ModePut);
548 sbuf->putInteger(MESSAGE_EXCHANGE);
549 if (m_want_fast_send_particles){
550 sbuf->putInt64(m_nb_particle_finished_exchange);
551 m_nb_particle_finished_exchange = 0;
553 sbuf->putInt32(m_rank);
554 sbuf->putInt64(nb_item);
555 items_to_send_uid.resize(nb_item);
556 items_to_send_cells_uid.resize(nb_item);
558 for( Integer z=0; z<nb_item; ++z ){
559 Particle item = internal_items[acc_ids[z]];
560 items_to_send_uid[z] = item.uniqueId();
561 bool has_cell = item.hasCell();
562 items_to_send_cells_uid[z] = (has_cell) ? item.cell().uniqueId() : NULL_ITEM_UNIQUE_ID;
563#ifdef ARCANE_DEBUG_EXCHANGE_ITEMS
565 info() <<
"Particle BufID=" << acc_ids[z]
566 <<
" LID=" << item.localId()
567 <<
" UID=" << items_to_send_uid[z]
568 <<
" CellIUID=" << items_to_send_cells_uid[z]
569 <<
" (owner=" << item.cell().owner() <<
")";
573 sbuf->putSpan(items_to_send_uid);
574 sbuf->putSpan(items_to_send_cells_uid);
576 for( VariableList::Enumerator i_var(m_variables_to_exchange); ++i_var; ){
577 IVariable* var = *i_var;
578 var->serialize(sbuf,acc_ids);
581 m_pending_messages.add(sm);
587void NonBlockingParticleExchanger::
588_deserializeMessage(ISerializeMessage* message,
589 Int64Array& items_to_create_unique_id,
590 Int64Array& items_to_create_cells_id,
591 ItemGroup item_group,
592 Int32Array* new_particle_local_ids)
595 IMesh* mesh = m_item_family->mesh();
596 ISerializer* sbuf = message->serializer();
599 sbuf->setMode(ISerializer::ModeGet);
600 sbuf->setReadMode(ISerializer::ReadReplace);
604 Integer message_type = sbuf->getInteger();
605 info(4) <<
"Deserialise message_type=" << (int)message_type;
606 switch(message_type){
607 case MESSAGE_EXCHANGE:
609 m_need_general_receive =
true;
610 if (m_want_fast_send_particles){
611 Int64 nb_finished = sbuf->getInt64();
612 m_nb_particle_finished_exchange += nb_finished;
614 Int32 orig_rank = sbuf->getInt32();
615 Int64 nb_item = sbuf->getInt64();
617 info() <<
"------------- Receive: rank=" << orig_rank <<
" particle nb=" << nb_item
618 <<
" (orig_rank=" << message->destination() <<
")";
622 items_to_create_local_id.resize(nb_item);
623 items_to_create_unique_id.resize(nb_item);
624 items_to_create_cells_id.resize(nb_item);
625 sbuf->getSpan(items_to_create_unique_id);
626 sbuf->getSpan(items_to_create_cells_id);
627#ifdef ARCANE_DEBUG_EXCHANGE_ITEMS
630 for( Integer z=0; z<nb_item; ++z ){
631 info() <<
"Particle UID=" << items_to_create_unique_id[z]
632 <<
" CellIUID=" << items_to_create_cells_id[z];
636 cells_lid.resize(nb_item);
637 mesh->cellFamily()->itemsUniqueIdToLocalId(cells_lid,items_to_create_cells_id);
639 items_to_create_local_id.resize(nb_item);
640 ParticleVectorView particles_view = m_item_family->toParticleFamily()->addParticles(items_to_create_unique_id,
642 items_to_create_local_id);
643 info(5) <<
"Nb create=" << particles_view.size();
647 m_item_family->endUpdate();
651 ParticleInfoListView internal_items(m_item_family);
655 for( Integer z=0; z<nb_item; ++z ){
656 Particle item = internal_items[items_to_create_local_id[z]];
659 item.mutableItemBase().setOwner(m_rank,m_rank);
661 if (!item_group.null())
662 item_group.addItems(items_to_create_local_id,
false);
663 if (new_particle_local_ids)
664 new_particle_local_ids->addRange(items_to_create_local_id);
666 for( VariableCollection::Enumerator i_var(m_variables_to_exchange); ++i_var; ){
667 IVariable* var = *i_var;
668 var->serialize(sbuf,items_to_create_local_id);
672 case MESSAGE_NB_FINISH_EXCHANGE:
674 m_need_general_receive =
true;
677 m_can_process_messages =
true;
678 Int64 nb_particle = sbuf->getInt64();
679 Int32 orig_rank = sbuf->getInt32();
681 info() <<
"MESSAGE_NB_FINISH_EXCHANGE nb=" << nb_particle <<
" (from rank=" << orig_rank <<
")";
682 _addFinishExchangeParticle(nb_particle);
685 case MESSAGE_FINISH_EXCHANGE_STATUS:
687 m_nb_total_particle_finish_exchange = sbuf->getInt64();
688 m_exchange_finished = (m_nb_total_particle_finish_exchange==m_nb_total_particle);
690 info() <<
"** RECEIVING FINISH EXCHANGE " << m_exchange_finished
691 <<
" finish=" << m_nb_total_particle_finish_exchange
692 <<
" total=" << m_nb_total_particle;
698 case MESSAGE_CHANGE_BLOCKING:
700 m_need_general_receive =
true;
702 Integer nb_blocking_size = sbuf->getInteger();
706 if (nb_blocking_size<m_nb_blocking_size)
707 m_nb_blocking_size = nb_blocking_size;
708 info(4) <<
"** RECEIVING CHANGE BLOCKING"
709 <<
" new_blocking_size=" << m_nb_blocking_size;
712 if (m_waiting_local_ids.size()>0)
722void NonBlockingParticleExchanger::
725 if (!m_waiting_messages.empty())
733void NonBlockingParticleExchanger::
734_processFinishTrackingMessage()
743 if (m_rank==m_master_proc){
744 _addFinishExchangeParticle(m_nb_particle_finished_exchange);
748 if (m_nb_particle_finished_exchange!=0) {
749 info(4) <<
"Send to master proc (" << m_master_proc <<
") nb_finish=" << m_nb_particle_finished_exchange;
750 SerializeMessage* sm =
new SerializeMessage(m_rank,m_master_proc,ISerializeMessage::MT_Send);
751 ISerializer* sbuf = sm->serializer();
752 sbuf->setMode(ISerializer::ModeReserve);
753 sbuf->reserveInteger(1);
754 sbuf->reserve(DT_Int64,1);
755 sbuf->reserve(DT_Int32,1);
756 sbuf->allocateBuffer();
757 sbuf->setMode(ISerializer::ModePut);
758 sbuf->putInteger(MESSAGE_NB_FINISH_EXCHANGE);
759 sbuf->putInt64(m_nb_particle_finished_exchange);
760 sbuf->putInt32(m_rank);
761 m_pending_messages.add(sm);
764 m_nb_particle_finished_exchange = 0;
770void NonBlockingParticleExchanger::
771_sendFinishExchangeParticle()
773 Int32 nb_rank = m_parallel_mng->commSize();
774 if (m_rank!=m_master_proc || m_end_message_sended)
776 m_end_message_sended =
true;
777 info(4) <<
" ** ** SEND FINISH EXCHANGE PARTICLE2";
778 for( Integer i=0; i<nb_rank; ++i ){
779 if (i==m_master_proc)
781 SerializeMessage* sm =
new SerializeMessage(m_rank,i,ISerializeMessage::MT_Send);
782 ISerializer* sbuf = sm->serializer();
783 sbuf->setMode(ISerializer::ModeReserve);
784 sbuf->reserveInteger(1);
785 sbuf->reserve(DT_Int64,1);
786 sbuf->allocateBuffer();
787 sbuf->setMode(ISerializer::ModePut);
788 sbuf->putInteger(MESSAGE_FINISH_EXCHANGE_STATUS);
789 sbuf->putInt64(m_nb_total_particle_finish_exchange);
790 m_pending_messages.add(sm);
797void NonBlockingParticleExchanger::
798_addFinishExchangeParticle(
Int64 nb_particle_finish_exchange)
800 m_nb_total_particle_finish_exchange += nb_particle_finish_exchange;
801 Int32 nb_rank = m_parallel_mng->commSize();
802 Int64 nb_rank_as_int64 = nb_rank;
804 info(4) <<
"** RECEIVING FINISH EXCHANGE n=" << nb_particle_finish_exchange
805 <<
" totalfinish=" << m_nb_total_particle_finish_exchange
806 <<
" total=" << m_nb_total_particle;
808 Int64 remaining_particle = m_nb_total_particle - m_nb_total_particle_finish_exchange;
809 if (remaining_particle==0){
810 m_exchange_finished =
true;
811 m_need_general_receive =
false;
812 info() <<
"** ** FINISH TRACKING NB_RECV=" << m_nb_receive_message
813 <<
" (Date=" << platform::getCurrentDateTime() <<
")";
814 _sendFinishExchangeParticle();
816 else if (remaining_particle<(m_nb_blocking_size*nb_rank_as_int64)){
819 m_nb_blocking_size = 0;
820 warning() <<
"** ** CHANGE BLOCKING NEW_SIZE " << m_nb_blocking_size
821 <<
" REMAING_PARTICLE " << remaining_particle
822 <<
" (Date=" << platform::getCurrentDateTime() <<
")";
826 if (m_waiting_local_ids.size()>0)
828 for(
Int32 i=0; i<nb_rank; ++i ){
829 if (i==m_master_proc)
831 SerializeMessage* sm =
new SerializeMessage(m_rank,i,ISerializeMessage::MT_Send);
832 ISerializer* sbuf = sm->serializer();
833 sbuf->setMode(ISerializer::ModeReserve);
834 sbuf->reserveInteger(1);
835 sbuf->reserveInteger(1);
836 sbuf->allocateBuffer();
837 sbuf->setMode(ISerializer::ModePut);
838 sbuf->putInteger(MESSAGE_CHANGE_BLOCKING);
839 sbuf->putInteger(m_nb_blocking_size);
840 m_pending_messages.add(sm);
849void NonBlockingParticleExchanger::
861 NonBlockingParticleExchanger);
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
#define ARCANE_REGISTER_CASE_OPTIONS_NOAXL_FACTORY(aclass, ainterface, aname)
Enregistre un service de fabrique pour la classe aclass.
Tableau d'items de types quelconques.
Interface d'une famille d'entités.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual Ref< ISerializeMessageList > createSerializeMessageListRef()=0
Créé une liste pour gérer les 'ISerializeMessage'.
virtual char reduce(eReduceType rt, char v)=0
Effectue la réduction de type rt sur le réel v et retourne la valeur.
Groupe d'entités de maillage.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Vue constante d'un tableau de type T.
ItemVectorViewT< Particle > ParticleVectorView
Vue sur un vecteur de particules.
UniqueArray< Int64 > Int64UniqueArray
Tableau dynamique à une dimension d'entiers 64 bits.
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
UniqueArray< Int32 > Int32UniqueArray
Tableau dynamique à une dimension d'entiers 32 bits.
Int32 Integer
Type représentant un entier.