14#include "arcane/mesh/NonBlockingParticleExchanger.h"
16#include "arcane/utils/List.h"
17#include "arcane/utils/FatalErrorException.h"
18#include "arcane/utils/IFunctor.h"
19#include "arcane/utils/PlatformUtils.h"
21#include "arcane/core/ItemGroup.h"
22#include "arcane/core/ItemVector.h"
23#include "arcane/core/IItemFamily.h"
24#include "arcane/core/IParticleFamily.h"
25#include "arcane/core/IParallelMng.h"
26#include "arcane/core/IVariableMng.h"
27#include "arcane/core/IVariable.h"
28#include "arcane/core/IMesh.h"
29#include "arcane/core/Item.h"
30#include "arcane/core/Timer.h"
31#include "arcane/core/SerializeMessage.h"
32#include "arcane/core/ISerializeMessageList.h"
33#include "arcane/core/CommonVariables.h"
34#include "arcane/core/FactoryService.h"
47NonBlockingParticleExchanger::
48NonBlockingParticleExchanger(
const ServiceBuildInfo& sbi)
50, m_item_family(nullptr)
51, m_parallel_mng(sbi.mesh()->parallelMng())
52, m_rank(m_parallel_mng->commRank())
53, m_timer(new Timer(m_parallel_mng->timerMng(),
"NonBlockingParticleExchanger",Timer::TimerReal))
54, m_total_time_functor(0.)
55, m_total_time_waiting(0.)
56, m_nb_total_particle_finish_exchange(0)
57, m_nb_total_particle(0)
58, m_nb_original_blocking_size(0)
59, m_nb_blocking_size(m_nb_original_blocking_size)
60, m_exchange_finished(true)
62, m_need_general_receive(false)
63, m_end_message_sended(false)
64, m_can_process_messages(true)
65, m_can_process_non_blocking(false)
66, m_want_process_non_blocking(false)
67, m_want_fast_send_particles(true)
68, m_nb_receive_message(0)
69, m_nb_particle_finished_exchange(0)
76#if ARCANE_DEBUG_EXCHANGE_ITEMS
84NonBlockingParticleExchanger::
85~NonBlockingParticleExchanger()
89 if (!m_pending_messages.empty() || !m_waiting_messages.empty()){
90 String s = String::format(
"pending or waiting messages: nb_pending={0} nb_waiting=",
91 m_pending_messages.size(),m_waiting_messages.size());
95 if (!m_waiting_local_ids.empty()){
96 warning() << String::format(
"pending particles: nb_pending=",m_waiting_local_ids.size());
105void NonBlockingParticleExchanger::
114void NonBlockingParticleExchanger::
117 for( Integer i=0, is=m_accumulate_infos.size(); i<is; ++i ){
118 delete m_accumulate_infos[i];
119 m_accumulate_infos[i] = 0;
121 m_accumulate_infos.clear();
127void NonBlockingParticleExchanger::
133 m_end_message_sended =
false;
134 m_exchange_finished =
false;
135 m_nb_blocking_size = m_nb_original_blocking_size;
136 m_nb_receive_message = 0;
137 m_nb_particle_finished_exchange = 0;
142 info() <<
"BEGIN TRACKING TOTAL FLYING = " << m_nb_total_particle
144 <<
" (Date=" << platform::getCurrentDateTime() <<
")";
146 m_nb_total_particle_finish_exchange = 0;
148 m_need_general_receive =
true;
154 m_variables_to_exchange.clear();
155 m_item_family->usedVariables(m_variables_to_exchange);
156 m_variables_to_exchange.sortByName(
true);
162bool NonBlockingParticleExchanger::
176bool NonBlockingParticleExchanger::
191void NonBlockingParticleExchanger::
198 _sendPendingMessages();
204bool NonBlockingParticleExchanger::
213void NonBlockingParticleExchanger::
237bool NonBlockingParticleExchanger::
238_exchangeItems(Int32ConstArrayView local_ids,
239 Int32ConstArrayView sub_domains_to_send,ItemGroup item_group,
240 Int32Array* new_particle_local_ids,
243 if (m_want_fast_send_particles){
244 if (local_ids.empty())
245 _processFinishTrackingMessage();
248 if (!m_want_process_non_blocking)
249 _processFinishTrackingMessage();
250 if (m_exchange_finished){
251 _sendFinishExchangeParticle();
252 m_need_general_receive =
false;
254 _checkNeedReceiveMessage();
256 _checkSendItems(local_ids,sub_domains_to_send);
258 return _waitMessages(0,item_group,new_particle_local_ids,functor);
264bool NonBlockingParticleExchanger::
265_waitMessages(Integer nb_pending_particle,ItemGroup item_group,
266 Int32Array* new_particle_local_ids,IFunctor* functor)
268 ARCANE_UNUSED(nb_pending_particle);
270 if (!item_group.null())
273 m_can_process_messages =
true;
274 m_can_process_non_blocking = m_want_process_non_blocking;
275 while (m_can_process_messages && !m_exchange_finished){
276 m_can_process_messages =
false;
277 _processMessages(item_group,new_particle_local_ids,
false,functor);
280 if (m_exchange_finished){
281 info(5) <<
" ** EXCHANGE finished: ";
284 _processMessages(item_group,new_particle_local_ids,
true,0);
285 info(5) <<
" ** EXCHANGE finished END: ";
288 info(5) <<
" ** RETURN EXCHANGE m_exchange_finished: " << m_exchange_finished;
289 return m_exchange_finished;
295void NonBlockingParticleExchanger::
296_checkNeedReceiveMessage()
298 if (m_need_general_receive){
299 auto sm =
new SerializeMessage(m_rank,A_NULL_RANK,ISerializeMessage::MT_Recv);
300 m_pending_messages.add(sm);
301 m_need_general_receive =
false;
308void NonBlockingParticleExchanger::
309_generateSendItemsMessages(Int32ConstArrayView local_ids,
310 Int32ConstArrayView sub_domains_to_send)
312 Timer::Phase tphase(m_parallel_mng->timeStats(),TP_Communication);
314 IMesh* mesh = m_item_family->mesh();
317 mesh->cellFamily()->getCommunicatingSubDomains(communicating_sub_domains);
319 Integer nb_connected_sub_domain = communicating_sub_domains.size();
321 UniqueArray< SharedArray<Int32> > ids_to_send(nb_connected_sub_domain);
324 m_accumulate_infos.clear();
325 m_accumulate_infos.resize(nb_connected_sub_domain);
326 for( Integer i=0; i<nb_connected_sub_domain; ++i )
327 m_accumulate_infos[i] =
new SerializeMessage(m_rank,communicating_sub_domains[i],
328 ISerializeMessage::MT_Send);
330 _addItemsToSend(local_ids,sub_domains_to_send,communicating_sub_domains,ids_to_send);
331 _addItemsToSend(m_waiting_local_ids,m_waiting_sub_domains_to_send,
332 communicating_sub_domains,ids_to_send);
336 info() <<
"-- Subdomain " << m_rank <<
". NB to send: " << local_ids.size()
337 <<
" NB connected subdomains: " << nb_connected_sub_domain;
339 info() <<
"NB connected subdomain for " << m_rank <<
" : " << m_accumulate_infos.size();
340 for( Integer i=0, n=m_accumulate_infos.size(); i<n; ++i ){
341 info() <<
"------------- Send: rank=" << m_accumulate_infos[i]->destRank()
342 <<
" n=" << ids_to_send[i].size();
349 for( Integer j=0; j<nb_connected_sub_domain; ++j ){
350 ISerializeMessage* sm = m_accumulate_infos[j];
354 if (!ids_to_send[j].empty())
355 _serializeMessage(sm,ids_to_send[j],items_to_send_uid,
356 items_to_send_cells_uid);
362 m_accumulate_infos.clear();
365 info(5)<<
"NonBlockingParticleExchanger:: sendItems " <<
"local_ids "<<local_ids.size();
366 info(5)<<
"NonBlockingParticleExchanger:: sendItems " <<
"m_waiting_local_ids "<<m_waiting_local_ids.size();
368 m_item_family->toParticleFamily()->removeParticles(local_ids);
369 m_item_family->toParticleFamily()->removeParticles(m_waiting_local_ids);
370 m_item_family->endUpdate();
371 m_waiting_local_ids.clear();
372 m_waiting_sub_domains_to_send.clear();
378void NonBlockingParticleExchanger::
379_addItemsToSend(Int32ConstArrayView local_ids,
380 Int32ConstArrayView sub_domains_to_send,
381 Int32ConstArrayView communicating_sub_domains,
382 UniqueArray<SharedArray<Int32> >& ids_to_send)
384 String func_name(
"NonBlockingParticleExchanger::_addItemsToSend()");
385 Integer nb_connected_sub_domain = ids_to_send.size();
388 Integer id_size = local_ids.size();
389 for( Integer i=0; i<id_size; ++i ){
390 Int32 item_local_id = local_ids[i];
391 Integer sd_to_send = sub_domains_to_send[i];
393 if (sd_to_send==m_rank)
395 fatal() << func_name <<
"The entity with local id " << item_local_id
396 <<
" should not be sent to its own subdomain";
401 Integer sd_index = nb_connected_sub_domain;
402 for( Integer i_sd=0; i_sd<nb_connected_sub_domain; ++i_sd )
403 if (sd_to_send==communicating_sub_domains[i_sd]){
408 if (sd_index==nb_connected_sub_domain)
409 fatal() << func_name <<
"Internal: bad subdomain index";
411 ids_to_send[sd_index].add(item_local_id);
418void NonBlockingParticleExchanger::
419_processMessages(ItemGroup item_group,Int32Array* new_particle_local_ids,
420 bool wait_all,IFunctor* functor)
422 _sendPendingMessages();
426 Timer::Sentry ts(m_timer);
427 functor->executeFunctor();
429 m_total_time_functor += m_timer->lastActivationTime();
430 info(5) <<
"TimeFunctor: current=" << m_timer->lastActivationTime()
431 <<
" total=" << m_total_time_functor;
434 Integer nb_message_finished = 0;
436 Timer::Sentry ts(m_timer);
438 nb_message_finished = m_message_list->waitMessages(Parallel::WaitAll);
440 if (m_can_process_non_blocking)
441 nb_message_finished = m_message_list->waitMessages(Parallel::WaitSomeNonBlocking);
443 nb_message_finished = m_message_list->waitMessages(Parallel::WaitSome);
445 if (nb_message_finished==0){
446 m_can_process_non_blocking =
false;
447 m_can_process_messages =
true;
448 _processFinishTrackingMessage();
453 m_total_time_waiting += m_timer->lastActivationTime();
454 info(5) <<
"TimeWaiting: current=" << m_timer->lastActivationTime()
455 <<
" total=" << m_total_time_waiting;
459 UniqueArray<ISerializeMessage*> current_messages(m_waiting_messages);
460 m_waiting_messages.clear();
464 for( Integer i=0, is=current_messages.size(); i<is; ++i ){
465 ISerializeMessage* sm = current_messages[i];
468 _deserializeMessage(sm,items_to_create_id,items_to_create_cells_id,item_group,new_particle_local_ids);
469 ++m_nb_receive_message;
474 m_waiting_messages.add(sm);
481void NonBlockingParticleExchanger::
482_sendPendingMessages()
484 IParallelMng* pm = m_parallel_mng;
486 _checkNeedReceiveMessage();
488 if (!m_message_list.get())
492 Timer::Sentry ts(m_timer);
494 Integer nb_message = m_pending_messages.size();
495 for( Integer i=0; i<nb_message; ++i ){
496 m_message_list->addMessage(m_pending_messages[i]);
497 m_waiting_messages.add(m_pending_messages[i]);
499 m_message_list->processPendingMessages();
500 m_pending_messages.clear();
502 info(5) <<
"TimeSendMessages=" << m_timer->lastActivationTime()
503 <<
" buffersize=" << m_waiting_local_ids.size();
509void NonBlockingParticleExchanger::
510_serializeMessage(ISerializeMessage* sm,
511 Int32ConstArrayView acc_ids,
512 Int64Array& items_to_send_uid,
513 Int64Array& items_to_send_cells_uid)
515 ParticleInfoListView internal_items(m_item_family);
517 ISerializer* sbuf = sm->serializer();
518 sbuf->setMode(ISerializer::ModeReserve);
522 Integer nb_item = acc_ids.size();
524 sbuf->reserveInteger(1);
525 if (m_want_fast_send_particles){
527 sbuf->reserveInt64(1);
530 sbuf->reserveInt32(1);
532 sbuf->reserveInt64(1);
534 sbuf->reserveSpan(eBasicDataType::Int64,nb_item);
536 sbuf->reserveSpan(eBasicDataType::Int64,nb_item);
538 for( VariableList::Enumerator i_var(m_variables_to_exchange); ++i_var; ){
539 IVariable* var = *i_var;
540 var->serialize(sbuf,acc_ids);
544 sbuf->allocateBuffer();
545 sbuf->setMode(ISerializer::ModePut);
547 sbuf->putInteger(MESSAGE_EXCHANGE);
548 if (m_want_fast_send_particles){
549 sbuf->putInt64(m_nb_particle_finished_exchange);
550 m_nb_particle_finished_exchange = 0;
552 sbuf->putInt32(m_rank);
553 sbuf->putInt64(nb_item);
554 items_to_send_uid.resize(nb_item);
555 items_to_send_cells_uid.resize(nb_item);
557 for( Integer z=0; z<nb_item; ++z ){
558 Particle item = internal_items[acc_ids[z]];
559 items_to_send_uid[z] = item.uniqueId();
560 bool has_cell = item.hasCell();
561 items_to_send_cells_uid[z] = (has_cell) ? item.cell().uniqueId() : NULL_ITEM_UNIQUE_ID;
562#ifdef ARCANE_DEBUG_EXCHANGE_ITEMS
564 info() <<
"Particle BufID=" << acc_ids[z]
565 <<
" LID=" << item.localId()
566 <<
" UID=" << items_to_send_uid[z]
567 <<
" CellIUID=" << items_to_send_cells_uid[z]
568 <<
" (owner=" << item.cell().owner() <<
")";
572 sbuf->putSpan(items_to_send_uid);
573 sbuf->putSpan(items_to_send_cells_uid);
575 for( VariableList::Enumerator i_var(m_variables_to_exchange); ++i_var; ){
576 IVariable* var = *i_var;
577 var->serialize(sbuf,acc_ids);
580 m_pending_messages.add(sm);
586void NonBlockingParticleExchanger::
587_deserializeMessage(ISerializeMessage* message,
588 Int64Array& items_to_create_unique_id,
589 Int64Array& items_to_create_cells_id,
590 ItemGroup item_group,
591 Int32Array* new_particle_local_ids)
594 IMesh* mesh = m_item_family->mesh();
595 ISerializer* sbuf = message->serializer();
598 sbuf->setMode(ISerializer::ModeGet);
599 sbuf->setReadMode(ISerializer::ReadReplace);
603 Integer message_type = sbuf->getInteger();
604 info(4) <<
"Deserialise message_type=" << (int)message_type;
605 switch(message_type){
606 case MESSAGE_EXCHANGE:
608 m_need_general_receive =
true;
609 if (m_want_fast_send_particles){
610 Int64 nb_finished = sbuf->getInt64();
611 m_nb_particle_finished_exchange += nb_finished;
613 Int32 orig_rank = sbuf->getInt32();
614 Int64 nb_item = sbuf->getInt64();
616 info() <<
"------------- Receive: rank=" << orig_rank <<
" particle nb=" << nb_item
617 <<
" (orig_rank=" << message->destination() <<
")";
621 items_to_create_local_id.resize(nb_item);
622 items_to_create_unique_id.resize(nb_item);
623 items_to_create_cells_id.resize(nb_item);
624 sbuf->getSpan(items_to_create_unique_id);
625 sbuf->getSpan(items_to_create_cells_id);
626#ifdef ARCANE_DEBUG_EXCHANGE_ITEMS
629 for( Integer z=0; z<nb_item; ++z ){
630 info() <<
"Particle UID=" << items_to_create_unique_id[z]
631 <<
" CellIUID=" << items_to_create_cells_id[z];
635 cells_lid.resize(nb_item);
636 mesh->cellFamily()->itemsUniqueIdToLocalId(cells_lid,items_to_create_cells_id);
638 items_to_create_local_id.resize(nb_item);
639 ParticleVectorView particles_view = m_item_family->toParticleFamily()->addParticles(items_to_create_unique_id,
641 items_to_create_local_id);
642 info(5) <<
"Nb create=" << particles_view.size();
646 m_item_family->endUpdate();
650 ParticleInfoListView internal_items(m_item_family);
654 for( Integer z=0; z<nb_item; ++z ){
655 Particle item = internal_items[items_to_create_local_id[z]];
658 item.mutableItemBase().setOwner(m_rank,m_rank);
660 if (!item_group.null())
661 item_group.addItems(items_to_create_local_id,
false);
662 if (new_particle_local_ids)
663 new_particle_local_ids->addRange(items_to_create_local_id);
665 for( VariableCollection::Enumerator i_var(m_variables_to_exchange); ++i_var; ){
666 IVariable* var = *i_var;
667 var->serialize(sbuf,items_to_create_local_id);
671 case MESSAGE_NB_FINISH_EXCHANGE:
673 m_need_general_receive =
true;
676 m_can_process_messages =
true;
677 Int64 nb_particle = sbuf->getInt64();
678 Int32 orig_rank = sbuf->getInt32();
680 info() <<
"MESSAGE_NB_FINISH_EXCHANGE nb=" << nb_particle <<
" (from rank=" << orig_rank <<
")";
681 _addFinishExchangeParticle(nb_particle);
684 case MESSAGE_FINISH_EXCHANGE_STATUS:
686 m_nb_total_particle_finish_exchange = sbuf->getInt64();
687 m_exchange_finished = (m_nb_total_particle_finish_exchange==m_nb_total_particle);
689 info() <<
"** RECEIVING FINISH EXCHANGE " << m_exchange_finished
690 <<
" finish=" << m_nb_total_particle_finish_exchange
691 <<
" total=" << m_nb_total_particle;
697 case MESSAGE_CHANGE_BLOCKING:
699 m_need_general_receive =
true;
701 Integer nb_blocking_size = sbuf->getInteger();
705 if (nb_blocking_size<m_nb_blocking_size)
706 m_nb_blocking_size = nb_blocking_size;
707 info(4) <<
"** RECEIVING CHANGE BLOCKING"
708 <<
" new_blocking_size=" << m_nb_blocking_size;
711 if (m_waiting_local_ids.size()>0)
721void NonBlockingParticleExchanger::
724 if (!m_waiting_messages.empty())
732void NonBlockingParticleExchanger::
733_processFinishTrackingMessage()
742 if (m_rank==m_master_proc){
743 _addFinishExchangeParticle(m_nb_particle_finished_exchange);
747 if (m_nb_particle_finished_exchange!=0) {
748 info(4) <<
"Send to master proc (" << m_master_proc <<
") nb_finish=" << m_nb_particle_finished_exchange;
749 SerializeMessage* sm =
new SerializeMessage(m_rank,m_master_proc,ISerializeMessage::MT_Send);
750 ISerializer* sbuf = sm->serializer();
751 sbuf->setMode(ISerializer::ModeReserve);
752 sbuf->reserveInteger(1);
753 sbuf->reserveInt64(1);
754 sbuf->reserveInt32(1);
755 sbuf->allocateBuffer();
756 sbuf->setMode(ISerializer::ModePut);
757 sbuf->putInteger(MESSAGE_NB_FINISH_EXCHANGE);
758 sbuf->putInt64(m_nb_particle_finished_exchange);
759 sbuf->putInt32(m_rank);
760 m_pending_messages.add(sm);
763 m_nb_particle_finished_exchange = 0;
769void NonBlockingParticleExchanger::
770_sendFinishExchangeParticle()
772 Int32 nb_rank = m_parallel_mng->commSize();
773 if (m_rank!=m_master_proc || m_end_message_sended)
775 m_end_message_sended =
true;
776 info(4) <<
" ** ** SEND FINISH EXCHANGE PARTICLE2";
777 for( Integer i=0; i<nb_rank; ++i ){
778 if (i==m_master_proc)
780 SerializeMessage* sm =
new SerializeMessage(m_rank,i,ISerializeMessage::MT_Send);
781 ISerializer* sbuf = sm->serializer();
782 sbuf->setMode(ISerializer::ModeReserve);
783 sbuf->reserveInteger(1);
784 sbuf->reserveInt64(1);
785 sbuf->allocateBuffer();
786 sbuf->setMode(ISerializer::ModePut);
787 sbuf->putInteger(MESSAGE_FINISH_EXCHANGE_STATUS);
788 sbuf->putInt64(m_nb_total_particle_finish_exchange);
789 m_pending_messages.add(sm);
796void NonBlockingParticleExchanger::
797_addFinishExchangeParticle(
Int64 nb_particle_finish_exchange)
799 m_nb_total_particle_finish_exchange += nb_particle_finish_exchange;
800 Int32 nb_rank = m_parallel_mng->commSize();
801 Int64 nb_rank_as_int64 = nb_rank;
803 info(4) <<
"** RECEIVING FINISH EXCHANGE n=" << nb_particle_finish_exchange
804 <<
" totalfinish=" << m_nb_total_particle_finish_exchange
805 <<
" total=" << m_nb_total_particle;
807 Int64 remaining_particle = m_nb_total_particle - m_nb_total_particle_finish_exchange;
808 if (remaining_particle==0){
809 m_exchange_finished =
true;
810 m_need_general_receive =
false;
811 info() <<
"** ** FINISH TRACKING NB_RECV=" << m_nb_receive_message
812 <<
" (Date=" << platform::getCurrentDateTime() <<
")";
813 _sendFinishExchangeParticle();
815 else if (remaining_particle<(m_nb_blocking_size*nb_rank_as_int64)){
818 m_nb_blocking_size = 0;
819 warning() <<
"** ** CHANGE BLOCKING NEW_SIZE " << m_nb_blocking_size
820 <<
" REMAING_PARTICLE " << remaining_particle
821 <<
" (Date=" << platform::getCurrentDateTime() <<
")";
825 if (m_waiting_local_ids.size()>0)
827 for(
Int32 i=0; i<nb_rank; ++i ){
828 if (i==m_master_proc)
830 SerializeMessage* sm =
new SerializeMessage(m_rank,i,ISerializeMessage::MT_Send);
831 ISerializer* sbuf = sm->serializer();
832 sbuf->setMode(ISerializer::ModeReserve);
833 sbuf->reserveInteger(1);
834 sbuf->reserveInteger(1);
835 sbuf->allocateBuffer();
836 sbuf->setMode(ISerializer::ModePut);
837 sbuf->putInteger(MESSAGE_CHANGE_BLOCKING);
838 sbuf->putInteger(m_nb_blocking_size);
839 m_pending_messages.add(sm);
848void NonBlockingParticleExchanger::
860 NonBlockingParticleExchanger);
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
#define ARCANE_REGISTER_CASE_OPTIONS_NOAXL_FACTORY(aclass, ainterface, aname)
Enregistre un service de fabrique pour la classe aclass.
Tableau d'items de types quelconques.
Interface d'une famille d'entités.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual Ref< ISerializeMessageList > createSerializeMessageListRef()=0
Créé une liste pour gérer les 'ISerializeMessage'.
virtual char reduce(eReduceType rt, char v)=0
Effectue la réduction de type rt sur le réel v et retourne la valeur.
Groupe d'entités de maillage.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Vue constante d'un tableau de type T.
ItemVectorViewT< Particle > ParticleVectorView
Vue sur un vecteur de particules.
UniqueArray< Int64 > Int64UniqueArray
Tableau dynamique à une dimension d'entiers 64 bits.
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
UniqueArray< Int32 > Int32UniqueArray
Tableau dynamique à une dimension d'entiers 32 bits.
Int32 Integer
Type représentant un entier.