Arcane  v3.16.0.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
NonBlockingParticleExchanger.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* NonBlockingParticleExchanger.cc (C) 2000-2025 */
9/* */
10/* Echangeur de particules. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/mesh/NonBlockingParticleExchanger.h"
15
16#include "arcane/utils/List.h"
17#include "arcane/utils/FatalErrorException.h"
18#include "arcane/utils/IFunctor.h"
19#include "arcane/utils/PlatformUtils.h"
20
21#include "arcane/core/ItemGroup.h"
22#include "arcane/core/ItemVector.h"
23#include "arcane/core/IItemFamily.h"
24#include "arcane/core/IParticleFamily.h"
25#include "arcane/core/IParallelMng.h"
26#include "arcane/core/IVariableMng.h"
27#include "arcane/core/IVariable.h"
28#include "arcane/core/IMesh.h"
29#include "arcane/core/Item.h"
30#include "arcane/core/Timer.h"
31#include "arcane/core/ISerializeMessageList.h"
32#include "arcane/core/CommonVariables.h"
33#include "arcane/core/FactoryService.h"
34#include "arcane/core/internal/SerializeMessage.h"
35
36//#define ARCANE_DEBUG_EXCHANGE_ITEMS
37
38/*---------------------------------------------------------------------------*/
39/*---------------------------------------------------------------------------*/
40
41namespace Arcane::mesh
42{
43
44/*---------------------------------------------------------------------------*/
45/*---------------------------------------------------------------------------*/
46
47NonBlockingParticleExchanger::
48NonBlockingParticleExchanger(const ServiceBuildInfo& sbi)
49: BasicService(sbi)
50, m_item_family(nullptr)
51, m_parallel_mng(sbi.mesh()->parallelMng())
52, m_rank(m_parallel_mng->commRank())
53, m_timer(new Timer(m_parallel_mng->timerMng(),"NonBlockingParticleExchanger",Timer::TimerReal))
54, m_total_time_functor(0.)
55, m_total_time_waiting(0.)
56, m_nb_total_particle_finish_exchange(0)
57, m_nb_total_particle(0)
58, m_nb_original_blocking_size(0)
59, m_nb_blocking_size(m_nb_original_blocking_size)
60, m_exchange_finished(true)
61, m_master_proc(0)
62, m_need_general_receive(false)
63, m_end_message_sended(false)
64, m_can_process_messages(true)
65, m_can_process_non_blocking(false)
66, m_want_process_non_blocking(false)
67, m_want_fast_send_particles(true)
68, m_nb_receive_message(0)
69, m_nb_particle_finished_exchange(0)
70, m_verbose_level(1)
71, m_is_debug(false)
72{
73 // m_want_fast_send_particles permet d'envoyer en même temps que
74 // les particules le nombre qui ont finie la poursuite. Cela permet
75 // d'éviter des messages supplémentaires.
76#if ARCANE_DEBUG_EXCHANGE_ITEMS
77 m_is_debug = true;
78#endif
79}
80
81/*---------------------------------------------------------------------------*/
82/*---------------------------------------------------------------------------*/
83
84NonBlockingParticleExchanger::
85~NonBlockingParticleExchanger()
86{
87 // On ne peut pas faire de throw dans le destructeur donc on affiche
88 // des avertissements de compilation
89 if (!m_pending_messages.empty() || !m_waiting_messages.empty()){
90 String s = String::format("pending or waiting messages: nb_pending={0} nb_waiting=",
91 m_pending_messages.size(),m_waiting_messages.size());
92 warning() << s;
93 }
94
95 if (!m_waiting_local_ids.empty()){
96 warning() << String::format("pending particles: nb_pending=",m_waiting_local_ids.size());
97 }
98
99 delete m_timer;
100}
101
102/*---------------------------------------------------------------------------*/
103/*---------------------------------------------------------------------------*/
104
105void NonBlockingParticleExchanger::
106initialize(IItemFamily* item_family)
107{
108 m_item_family = item_family;
109}
110
111/*---------------------------------------------------------------------------*/
112/*---------------------------------------------------------------------------*/
113
114void NonBlockingParticleExchanger::
115_clearMessages()
116{
117 for( Integer i=0, is=m_accumulate_infos.size(); i<is; ++i ){
118 delete m_accumulate_infos[i];
119 m_accumulate_infos[i] = 0;
120 }
121 m_accumulate_infos.clear();
122}
123
124/*---------------------------------------------------------------------------*/
125/*---------------------------------------------------------------------------*/
126
127void NonBlockingParticleExchanger::
128beginNewExchange(Integer i_nb_particle)
129{
130 _checkInitialized();
131 IParallelMng* pm = m_parallel_mng;
132
133 m_end_message_sended = false;
134 m_exchange_finished = false;
135 m_nb_blocking_size = m_nb_original_blocking_size;
136 m_nb_receive_message = 0;
137 m_nb_particle_finished_exchange = 0;
138
139 //TODO: utiliser un tag spécifique pour cet échange.
140 Int64 nb_particle = i_nb_particle;
141 m_nb_total_particle = pm->reduce(Parallel::ReduceSum,nb_particle);
142 info() << "BEGIN TRACKING TOTAL FLYING = " << m_nb_total_particle
143 << " (local=" << nb_particle << ") "
144 << " (Date=" << platform::getCurrentDateTime() << ")";
145
146 m_nb_total_particle_finish_exchange = 0;
147
148 m_need_general_receive = true;
149
150 // Récupère la liste des variables à transferer.
151 // Il s'agit des variables qui ont la même famille que celle passée
152 // en paramètre.
153 // IMPORTANT: tous les sous-domaines doivent avoir ces mêmes variables
155 m_item_family->usedVariables(m_variables_to_exchange);
156 m_variables_to_exchange.sortByName(true);
157}
158
159/*---------------------------------------------------------------------------*/
160/*---------------------------------------------------------------------------*/
161
162bool NonBlockingParticleExchanger::
163exchangeItems(Integer nb_particle_finish_exchange,
164 Int32ConstArrayView local_ids,
165 Int32ConstArrayView sub_domains_to_send,ItemGroup item_group,
166 IFunctor* functor)
167{
168 //TODO a supprimer si passe par sendItems()
169 m_nb_particle_finished_exchange += nb_particle_finish_exchange;
170 return _exchangeItems(local_ids,sub_domains_to_send,item_group,0,functor);
171}
172
173/*---------------------------------------------------------------------------*/
174/*---------------------------------------------------------------------------*/
175
176bool NonBlockingParticleExchanger::
177exchangeItems(Integer nb_particle_finish_exchange,
178 Int32ConstArrayView local_ids,
179 Int32ConstArrayView sub_domains_to_send,
180 Int32Array* new_particle_local_ids,
181 IFunctor* functor)
182{
183 //TODO a supprimer si passe par sendItems()
184 m_nb_particle_finished_exchange += nb_particle_finish_exchange;
185 return _exchangeItems(local_ids,sub_domains_to_send,ItemGroup(),new_particle_local_ids,functor);
186}
187
188/*---------------------------------------------------------------------------*/
189/*---------------------------------------------------------------------------*/
190
191void NonBlockingParticleExchanger::
192sendItems(Integer nb_particle_finish_exchange,
193 Int32ConstArrayView local_ids,
194 Int32ConstArrayView sub_domains_to_send)
195{
196 m_nb_particle_finished_exchange += nb_particle_finish_exchange;
197 _checkSendItems(local_ids,sub_domains_to_send);
198 _sendPendingMessages();
199}
200
201/*---------------------------------------------------------------------------*/
202/*---------------------------------------------------------------------------*/
203
204bool NonBlockingParticleExchanger::
205waitMessages(Integer nb_pending_particle,Int32Array* new_particle_local_ids,IFunctor* functor)
206{
207 return _waitMessages(nb_pending_particle,ItemGroup(),new_particle_local_ids,functor);
208}
209
210/*---------------------------------------------------------------------------*/
211/*---------------------------------------------------------------------------*/
212
213void NonBlockingParticleExchanger::
214_checkSendItems(Int32ConstArrayView local_ids,
215 Int32ConstArrayView sub_domains_to_send)
216{
217 Integer nb_particle = local_ids.size();
218 Integer nb_waiting_local_ids = m_waiting_local_ids.size();
219 Integer nb_waiting_sub_domains_to_send = m_waiting_sub_domains_to_send.size();
220 if ((nb_particle+nb_waiting_local_ids)>=m_nb_blocking_size){
221 _generateSendItemsMessages(local_ids,sub_domains_to_send);
222 }
223 else{
224 // Met les particules dans un tampon avant de les envoyer
225 m_waiting_local_ids.resize(nb_waiting_local_ids+nb_particle);
226 m_waiting_sub_domains_to_send.resize(nb_waiting_sub_domains_to_send+nb_particle);
227 for( Integer i=0; i<nb_particle; ++i ){
228 m_waiting_local_ids[nb_waiting_local_ids+i] = local_ids[i];
229 m_waiting_sub_domains_to_send[nb_waiting_sub_domains_to_send+i] = sub_domains_to_send[i];
230 }
231 }
232}
233
234/*---------------------------------------------------------------------------*/
235/*---------------------------------------------------------------------------*/
236
237bool NonBlockingParticleExchanger::
238_exchangeItems(Int32ConstArrayView local_ids,
239 Int32ConstArrayView sub_domains_to_send,ItemGroup item_group,
240 Int32Array* new_particle_local_ids,
241 IFunctor* functor)
242{
243 if (m_want_fast_send_particles){
244 if (local_ids.empty())
245 _processFinishTrackingMessage();
246 }
247 else
248 if (!m_want_process_non_blocking)
249 _processFinishTrackingMessage();
250 if (m_exchange_finished){
251 _sendFinishExchangeParticle();
252 m_need_general_receive = false;
253 }
254 _checkNeedReceiveMessage();
255
256 _checkSendItems(local_ids,sub_domains_to_send);
257
258 return _waitMessages(0,item_group,new_particle_local_ids,functor);
259}
260
261/*---------------------------------------------------------------------------*/
262/*---------------------------------------------------------------------------*/
263
264bool NonBlockingParticleExchanger::
265_waitMessages(Integer nb_pending_particle,ItemGroup item_group,
266 Int32Array* new_particle_local_ids,IFunctor* functor)
267{
268 ARCANE_UNUSED(nb_pending_particle);
269
270 if (!item_group.null())
271 item_group.clear();
272
273 m_can_process_messages = true;
274 m_can_process_non_blocking = m_want_process_non_blocking;
275 while (m_can_process_messages && !m_exchange_finished){
276 m_can_process_messages = false;
277 _processMessages(item_group,new_particle_local_ids,false,functor);
278 }
279
280 if (m_exchange_finished){
281 info(5) << " ** EXCHANGE finished: ";
282 // Cela est nécessaire pour être certain que les messages
283 // de fin sont bien tous réceptionnés
284 _processMessages(item_group,new_particle_local_ids,true,0);
285 info(5) << " ** EXCHANGE finished END: ";
286 }
287
288 info(5) << " ** RETURN EXCHANGE m_exchange_finished: " << m_exchange_finished;
289 return m_exchange_finished;
290}
291
292/*---------------------------------------------------------------------------*/
293/*---------------------------------------------------------------------------*/
294
295void NonBlockingParticleExchanger::
296_checkNeedReceiveMessage()
297{
298 if (m_need_general_receive){
299 auto sm = new SerializeMessage(m_rank,A_NULL_RANK,ISerializeMessage::MT_Recv);
300 m_pending_messages.add(sm);
301 m_need_general_receive = false;
302 }
303}
304
305/*---------------------------------------------------------------------------*/
306/*---------------------------------------------------------------------------*/
307
308void NonBlockingParticleExchanger::
309_generateSendItemsMessages(Int32ConstArrayView local_ids,
310 Int32ConstArrayView sub_domains_to_send)
311{
312 Timer::Phase tphase(m_parallel_mng->timeStats(),TP_Communication);
313
314 IMesh* mesh = m_item_family->mesh();
315
316 Int32UniqueArray communicating_sub_domains;
317 mesh->cellFamily()->getCommunicatingSubDomains(communicating_sub_domains);
318
319 Integer nb_connected_sub_domain = communicating_sub_domains.size();
320 //Integer max_sub_domain_id = 0;
321 UniqueArray< SharedArray<Int32> > ids_to_send(nb_connected_sub_domain);
322 // Infos pour chaque sous-domaine connecté
323 //_clearMessages();
324 m_accumulate_infos.clear();
325 m_accumulate_infos.resize(nb_connected_sub_domain);
326 for( Integer i=0; i<nb_connected_sub_domain; ++i )
327 m_accumulate_infos[i] = new SerializeMessage(m_rank,communicating_sub_domains[i],
328 ISerializeMessage::MT_Send);
329
330 _addItemsToSend(local_ids,sub_domains_to_send,communicating_sub_domains,ids_to_send);
331 _addItemsToSend(m_waiting_local_ids,m_waiting_sub_domains_to_send,
332 communicating_sub_domains,ids_to_send);
333
334
335 if (m_is_debug){
336 info() << "-- Subdomain " << m_rank << ". NB to send: " << local_ids.size()
337 << " NB connected subdomains: " << nb_connected_sub_domain;
338
339 info() << "NB connected subdomain for " << m_rank << " : " << m_accumulate_infos.size();
340 for( Integer i=0, n=m_accumulate_infos.size(); i<n; ++i ){
341 info() << "------------- Send: rank=" << m_accumulate_infos[i]->destRank()
342 << " n=" << ids_to_send[i].size();
343 }
344 }
345
346 Int64UniqueArray items_to_send_uid;
347 Int64UniqueArray items_to_send_cells_uid; // Uniquement pour les particules;
348
349 for( Integer j=0; j<nb_connected_sub_domain; ++j ){
350 ISerializeMessage* sm = m_accumulate_infos[j];
351 // En mode bloquant, envoie toujours le message car le destinataire a posté
352 // un message de réception. Sinon, le message n'a besoin d'être envoyé que
353 // s'il contient des particules.
354 if (!ids_to_send[j].empty())
355 _serializeMessage(sm,ids_to_send[j],items_to_send_uid,
356 items_to_send_cells_uid);
357 else
358 // Le message n'est pas utile car vide.
359 delete sm;
360 }
361
362 m_accumulate_infos.clear();
363
364 // Détruit les entités qui viennent d'être envoyées
365 info(5)<<"NonBlockingParticleExchanger:: sendItems " << "local_ids "<<local_ids.size();
366 info(5)<<"NonBlockingParticleExchanger:: sendItems " << "m_waiting_local_ids "<<m_waiting_local_ids.size();
367
368 m_item_family->toParticleFamily()->removeParticles(local_ids);
369 m_item_family->toParticleFamily()->removeParticles(m_waiting_local_ids);
370 m_item_family->endUpdate();
371 m_waiting_local_ids.clear();
372 m_waiting_sub_domains_to_send.clear();
373}
374
375/*---------------------------------------------------------------------------*/
376/*---------------------------------------------------------------------------*/
377
378void NonBlockingParticleExchanger::
379_addItemsToSend(Int32ConstArrayView local_ids,
380 Int32ConstArrayView sub_domains_to_send,
381 Int32ConstArrayView communicating_sub_domains,
382 UniqueArray<SharedArray<Int32> >& ids_to_send)
383{
384 String func_name("NonBlockingParticleExchanger::_addItemsToSend()");
385 Integer nb_connected_sub_domain = ids_to_send.size();
386 // Cherche pour chaque élément à quel sous-domaine il doit être transféré.
387 // Cette recherche se fait en se basant sur les \a local_ids
388 Integer id_size = local_ids.size();
389 for( Integer i=0; i<id_size; ++i ){
390 Int32 item_local_id = local_ids[i];
391 Integer sd_to_send = sub_domains_to_send[i];
392#ifdef ARCANE_CHECK
393 if (sd_to_send==m_rank)
394 // Il s'agit d'une entité propre à ce sous-domaine
395 fatal() << func_name << "The entity with local id " << item_local_id
396 << " should not be sent to its own subdomain";
397#endif
398 // Recherche l'index du sous-domaine auquel l'entité appartient
399 // dans la liste \a sync_list
400 // TODO: utiliser une table indirect (tableau alloué au nombre de sous-domaines)
401 Integer sd_index = nb_connected_sub_domain;
402 for( Integer i_sd=0; i_sd<nb_connected_sub_domain; ++i_sd )
403 if (sd_to_send==communicating_sub_domains[i_sd]){
404 sd_index = i_sd;
405 break;
406 }
407#ifdef ARCANE_CHECK
408 if (sd_index==nb_connected_sub_domain)
409 fatal() << func_name << "Internal: bad subdomain index";
410#endif
411 ids_to_send[sd_index].add(item_local_id);
412 }
413}
414
415/*---------------------------------------------------------------------------*/
416/*---------------------------------------------------------------------------*/
417
418void NonBlockingParticleExchanger::
419_processMessages(ItemGroup item_group,Int32Array* new_particle_local_ids,
420 bool wait_all,IFunctor* functor)
421{
422 _sendPendingMessages();
423
424 if (functor){
425 {
426 Timer::Sentry ts(m_timer);
427 functor->executeFunctor();
428 }
429 m_total_time_functor += m_timer->lastActivationTime();
430 info(5) << "TimeFunctor: current=" << m_timer->lastActivationTime()
431 << " total=" << m_total_time_functor;
432 }
433
434 Integer nb_message_finished = 0;
435 {
436 Timer::Sentry ts(m_timer);
437 if (wait_all)
438 nb_message_finished = m_message_list->waitMessages(Parallel::WaitAll);
439 else{
440 if (m_can_process_non_blocking)
441 nb_message_finished = m_message_list->waitMessages(Parallel::WaitSomeNonBlocking);
442 else
443 nb_message_finished = m_message_list->waitMessages(Parallel::WaitSome);
444 //info() << "Nb finished=" << nb_message_finished << " is_block=" << m_can_process_non_blocking;
445 if (nb_message_finished==0){
446 m_can_process_non_blocking = false;
447 m_can_process_messages = true;
448 _processFinishTrackingMessage();
449 return;
450 }
451 }
452 }
453 m_total_time_waiting += m_timer->lastActivationTime();
454 info(5) << "TimeWaiting: current=" << m_timer->lastActivationTime()
455 << " total=" << m_total_time_waiting;
456
457 // Sauve les communications actuellement traitées car le traitement
458 // peut en ajouter de nouvelles
459 UniqueArray<ISerializeMessage*> current_messages(m_waiting_messages);
460 m_waiting_messages.clear();
461
462 Int64UniqueArray items_to_create_id;
463 Int64UniqueArray items_to_create_cells_id;
464 for( Integer i=0, is=current_messages.size(); i<is; ++i ){
465 ISerializeMessage* sm = current_messages[i];
466 if (sm->finished()){
467 if (!sm->isSend()){
468 _deserializeMessage(sm,items_to_create_id,items_to_create_cells_id,item_group,new_particle_local_ids);
469 ++m_nb_receive_message;
470 }
471 delete sm;
472 }
473 else
474 m_waiting_messages.add(sm);
475 }
476}
477
478/*---------------------------------------------------------------------------*/
479/*---------------------------------------------------------------------------*/
480
481void NonBlockingParticleExchanger::
482_sendPendingMessages()
483{
484 IParallelMng* pm = m_parallel_mng;
485
486 _checkNeedReceiveMessage();
487
488 if (!m_message_list.get())
489 m_message_list = pm->createSerializeMessageListRef();
490
491 {
492 Timer::Sentry ts(m_timer);
493 // Ajoute les messages en attente de traitement
494 Integer nb_message = m_pending_messages.size();
495 for( Integer i=0; i<nb_message; ++i ){
496 m_message_list->addMessage(m_pending_messages[i]);
497 m_waiting_messages.add(m_pending_messages[i]);
498 }
499 m_message_list->processPendingMessages();
500 m_pending_messages.clear();
501 }
502 info(5) << "TimeSendMessages=" << m_timer->lastActivationTime()
503 << " buffersize=" << m_waiting_local_ids.size();
504}
505
506/*---------------------------------------------------------------------------*/
507/*---------------------------------------------------------------------------*/
508
509void NonBlockingParticleExchanger::
510_serializeMessage(ISerializeMessage* sm,
511 Int32ConstArrayView acc_ids,
512 Int64Array& items_to_send_uid,
513 Int64Array& items_to_send_cells_uid)
514{
515 ParticleInfoListView internal_items(m_item_family);
516
517 ISerializer* sbuf = sm->serializer();
518 sbuf->setMode(ISerializer::ModeReserve);
519
520 //for( Integer j=0; j<nb_connected_sub_domain; ++j ){
521 //ConstArrayView<Integer> acc_ids = m_ids_to_send[j];
522 Integer nb_item = acc_ids.size();
523 // Réserve pour le type de message
524 sbuf->reserveInteger(1);
525 if (m_want_fast_send_particles){
526 // Réserve pour le nombre de particules traitées
527 sbuf->reserveInt64(1);
528 }
529 // Réserve pour le rang de l'expéditeur
530 sbuf->reserveInt32(1);
531 // Réserve pour le nombre de uniqueId()
532 sbuf->reserveInt64(1);
533 // Réserve pour les uniqueId() des particules
534 sbuf->reserveSpan(eBasicDataType::Int64,nb_item);
535 // Réserve pour les uniqueId() des mailles dans lesquelles se trouvent les particules
536 sbuf->reserveSpan(eBasicDataType::Int64,nb_item);
537
538 for( VariableList::Enumerator i_var(m_variables_to_exchange); ++i_var; ){
539 IVariable* var = *i_var;
540 var->serialize(sbuf,acc_ids);
541 }
542
543 // Sérialise les données en écriture
544 sbuf->allocateBuffer();
545 sbuf->setMode(ISerializer::ModePut);
546
547 sbuf->putInteger(MESSAGE_EXCHANGE);
548 if (m_want_fast_send_particles){
549 sbuf->putInt64(m_nb_particle_finished_exchange);
550 m_nb_particle_finished_exchange = 0;
551 }
552 sbuf->putInt32(m_rank);
553 sbuf->putInt64(nb_item);
554 items_to_send_uid.resize(nb_item);
555 items_to_send_cells_uid.resize(nb_item);
556
557 for( Integer z=0; z<nb_item; ++z ){
558 Particle item = internal_items[acc_ids[z]];
559 items_to_send_uid[z] = item.uniqueId();
560 bool has_cell = item.hasCell();
561 items_to_send_cells_uid[z] = (has_cell) ? item.cell().uniqueId() : NULL_ITEM_UNIQUE_ID;
562#ifdef ARCANE_DEBUG_EXCHANGE_ITEMS
563#if 0
564 info() << "Particle BufID=" << acc_ids[z]
565 << " LID=" << item.localId()
566 << " UID=" << items_to_send_uid[z]
567 << " CellIUID=" << items_to_send_cells_uid[z]
568 << " (owner=" << item.cell().owner() << ")";
569#endif
570#endif
571 }
572 sbuf->putSpan(items_to_send_uid);
573 sbuf->putSpan(items_to_send_cells_uid);
574
575 for( VariableList::Enumerator i_var(m_variables_to_exchange); ++i_var; ){
576 IVariable* var = *i_var;
577 var->serialize(sbuf,acc_ids);
578 }
579
580 m_pending_messages.add(sm);
581}
582
583/*---------------------------------------------------------------------------*/
584/*---------------------------------------------------------------------------*/
585
586void NonBlockingParticleExchanger::
587_deserializeMessage(ISerializeMessage* message,
588 Int64Array& items_to_create_unique_id,
589 Int64Array& items_to_create_cells_id,
590 ItemGroup item_group,
591 Int32Array* new_particle_local_ids)
592{
593
594 IMesh* mesh = m_item_family->mesh();
595 ISerializer* sbuf = message->serializer();
596
597 // Indique qu'on souhaite sérialiser les données en lecture
598 sbuf->setMode(ISerializer::ModeGet);
599 sbuf->setReadMode(ISerializer::ReadReplace);
600 Int32UniqueArray items_to_create_local_id;
601 Int32UniqueArray cells_lid;
602
603 Integer message_type = sbuf->getInteger();
604 info(4) << "Deserialise message_type=" << (int)message_type;
605 switch(message_type){
606 case MESSAGE_EXCHANGE:
607 {
608 m_need_general_receive = true;
609 if (m_want_fast_send_particles){
610 Int64 nb_finished = sbuf->getInt64();
611 m_nb_particle_finished_exchange += nb_finished;
612 }
613 Int32 orig_rank = sbuf->getInt32();
614 Int64 nb_item = sbuf->getInt64();
615 if (m_is_debug)
616 info() << "------------- Receive: rank=" << orig_rank << " particle nb=" << nb_item
617 << " (orig_rank=" << message->destination() << ")";
618
619 //if (nb_item!=0)
620 //info() << "Receiving particules n=" << nb_item;
621 items_to_create_local_id.resize(nb_item);
622 items_to_create_unique_id.resize(nb_item);
623 items_to_create_cells_id.resize(nb_item);
624 sbuf->getSpan(items_to_create_unique_id);
625 sbuf->getSpan(items_to_create_cells_id);
626#ifdef ARCANE_DEBUG_EXCHANGE_ITEMS
627 //info() << "Recv from SID " << sync_infos[i].subDomain() << " N=" << nb_item;
628#if 0
629 for( Integer z=0; z<nb_item; ++z ){
630 info() << "Particle UID=" << items_to_create_unique_id[z]
631 << " CellIUID=" << items_to_create_cells_id[z];
632 }
633#endif
634#endif
635 cells_lid.resize(nb_item);
636 mesh->cellFamily()->itemsUniqueIdToLocalId(cells_lid,items_to_create_cells_id);
637
638 items_to_create_local_id.resize(nb_item);
639 ParticleVectorView particles_view = m_item_family->toParticleFamily()->addParticles(items_to_create_unique_id,
640 cells_lid,
641 items_to_create_local_id);
642 info(5) << "Nb create=" << particles_view.size();
643
644 // Notifie la famille qu'on a fini nos modifs.
645 // Après appel à cette méthode, les variables sont à nouveau utilisables
646 m_item_family->endUpdate();
647
648 // Converti les uniqueId() récupérée en localId() et pour les particules
649 // renseigne la maille correspondante
650 ParticleInfoListView internal_items(m_item_family);
651 //ItemInternalList internal_cells(mesh->itemsInternal(IK_Cell));
652 //m_item_family->itemsUniqueIdToLocalId(items_to_create_local_id,items_to_create_unique_id);
653
654 for( Integer z=0; z<nb_item; ++z ){
655 Particle item = internal_items[items_to_create_local_id[z]];
656 //item.setCell( internal_cells[cells_lid[z]] );
657 // Je suis le nouveau propriétaire (TODO: ne pas faire ici)
658 item.mutableItemBase().setOwner(m_rank,m_rank);
659 }
660 if (!item_group.null())
661 item_group.addItems(items_to_create_local_id,false);
662 if (new_particle_local_ids)
663 new_particle_local_ids->addRange(items_to_create_local_id);
664
665 for( VariableCollection::Enumerator i_var(m_variables_to_exchange); ++i_var; ){
666 IVariable* var = *i_var;
667 var->serialize(sbuf,items_to_create_local_id);
668 }
669 }
670 break;
671 case MESSAGE_NB_FINISH_EXCHANGE:
672 {
673 m_need_general_receive = true;
674 // Indique qu'on peut continuer à recevoir des messages, celui-ci n'étant pas
675 // significatif
676 m_can_process_messages = true;
677 Int64 nb_particle = sbuf->getInt64();
678 Int32 orig_rank = sbuf->getInt32();
679 if (m_is_debug)
680 info() << "MESSAGE_NB_FINISH_EXCHANGE nb=" << nb_particle << " (from rank=" << orig_rank << ")";
681 _addFinishExchangeParticle(nb_particle);
682 }
683 break;
684 case MESSAGE_FINISH_EXCHANGE_STATUS:
685 {
686 m_nb_total_particle_finish_exchange = sbuf->getInt64();
687 m_exchange_finished = (m_nb_total_particle_finish_exchange==m_nb_total_particle);
688 //#ifdef ARCANE_DEBUG_EXCHANGE_ITEMS
689 info() << "** RECEIVING FINISH EXCHANGE " << m_exchange_finished
690 << " finish=" << m_nb_total_particle_finish_exchange
691 << " total=" << m_nb_total_particle;
692 //#endif
693 //if (m_exchange_finished)
694 //warning() << "Exchange finished ! " << m_current_iteration;
695 }
696 break;
697 case MESSAGE_CHANGE_BLOCKING:
698 {
699 m_need_general_receive = true;
700
701 Integer nb_blocking_size = sbuf->getInteger();
702 // Il faut être certain que le nouveau \a blocking_size
703 // est inférieur au courant, ce qui peut arriver lorsqu'on
704 // recoit plusieurs messages de ce type en même temps.
705 if (nb_blocking_size<m_nb_blocking_size)
706 m_nb_blocking_size = nb_blocking_size;
707 info(4) << "** RECEIVING CHANGE BLOCKING"
708 << " new_blocking_size=" << m_nb_blocking_size;
709 // Comme il peut y avoir des particules en attente, il faut
710 // maintenant les envoyer
711 if (m_waiting_local_ids.size()>0)
712 _generateSendItemsMessages(Int32ConstArrayView(),Int32ConstArrayView());
713 }
714 break;
715 }
716}
717
718/*---------------------------------------------------------------------------*/
719/*---------------------------------------------------------------------------*/
720
721void NonBlockingParticleExchanger::
722reset()
723{
724 if (!m_waiting_messages.empty())
725 ARCANE_FATAL("reset() waiting parallel requests");
726 _clearMessages();
727}
728
729/*---------------------------------------------------------------------------*/
730/*---------------------------------------------------------------------------*/
731
732void NonBlockingParticleExchanger::
733_processFinishTrackingMessage()
734{
735 // Si processeur == m_master_proc,
736 // - réceptionne des autres leur nombre fini.
737 // - fait le total
738 // - le renvoi à tout le monde
739 // Si processeur != m_master_proc
740 // - envoie à m_master_proc la valeur \a nb_finish_tracking_particle
741 // - réceptionne de m_master_proc le nombre total ayant terminé
742 if (m_rank==m_master_proc){
743 _addFinishExchangeParticle(m_nb_particle_finished_exchange);
744 }
745 else{
746 // Envoie le nombre de particules au processeur \a master_proc
747 if (m_nb_particle_finished_exchange!=0) {
748 info(4) << "Send to master proc (" << m_master_proc << ") nb_finish=" << m_nb_particle_finished_exchange;
749 SerializeMessage* sm = new SerializeMessage(m_rank,m_master_proc,ISerializeMessage::MT_Send);
750 ISerializer* sbuf = sm->serializer();
751 sbuf->setMode(ISerializer::ModeReserve);
752 sbuf->reserveInteger(1);
753 sbuf->reserveInt64(1);
754 sbuf->reserveInt32(1);
755 sbuf->allocateBuffer();
756 sbuf->setMode(ISerializer::ModePut);
757 sbuf->putInteger(MESSAGE_NB_FINISH_EXCHANGE);
758 sbuf->putInt64(m_nb_particle_finished_exchange);
759 sbuf->putInt32(m_rank);
760 m_pending_messages.add(sm);
761 }
762 }
763 m_nb_particle_finished_exchange = 0;
764}
765
766/*---------------------------------------------------------------------------*/
767/*---------------------------------------------------------------------------*/
768
769void NonBlockingParticleExchanger::
770_sendFinishExchangeParticle()
771{
772 Int32 nb_rank = m_parallel_mng->commSize();
773 if (m_rank!=m_master_proc || m_end_message_sended)
774 return;
775 m_end_message_sended = true;
776 info(4) << " ** ** SEND FINISH EXCHANGE PARTICLE2";
777 for( Integer i=0; i<nb_rank; ++i ){
778 if (i==m_master_proc)
779 continue;
780 SerializeMessage* sm = new SerializeMessage(m_rank,i,ISerializeMessage::MT_Send);
781 ISerializer* sbuf = sm->serializer();
782 sbuf->setMode(ISerializer::ModeReserve);
783 sbuf->reserveInteger(1);
784 sbuf->reserveInt64(1);
785 sbuf->allocateBuffer();
786 sbuf->setMode(ISerializer::ModePut);
787 sbuf->putInteger(MESSAGE_FINISH_EXCHANGE_STATUS);
788 sbuf->putInt64(m_nb_total_particle_finish_exchange);
789 m_pending_messages.add(sm);
790 }
791}
792
793/*---------------------------------------------------------------------------*/
794/*---------------------------------------------------------------------------*/
795
796void NonBlockingParticleExchanger::
797_addFinishExchangeParticle(Int64 nb_particle_finish_exchange)
798{
799 m_nb_total_particle_finish_exchange += nb_particle_finish_exchange;
800 Int32 nb_rank = m_parallel_mng->commSize();
801 Int64 nb_rank_as_int64 = nb_rank;
802 //#ifdef ARCANE_DEBUG_EXCHANGE_ITEMS
803 info(4) << "** RECEIVING FINISH EXCHANGE n=" << nb_particle_finish_exchange
804 << " totalfinish=" << m_nb_total_particle_finish_exchange
805 << " total=" << m_nb_total_particle;
806 //#endif
807 Int64 remaining_particle = m_nb_total_particle - m_nb_total_particle_finish_exchange;
808 if (remaining_particle==0){
809 m_exchange_finished = true;
810 m_need_general_receive = false;
811 info() << "** ** FINISH TRACKING NB_RECV=" << m_nb_receive_message
812 << " (Date=" << platform::getCurrentDateTime() << ")";
813 _sendFinishExchangeParticle();
814 }
815 else if (remaining_particle<(m_nb_blocking_size*nb_rank_as_int64)){
816 //Integer nb_rank = subDomain()->nbSubDomain();
817 //m_nb_blocking_size /= 100;
818 m_nb_blocking_size = 0;
819 warning() << "** ** CHANGE BLOCKING NEW_SIZE " << m_nb_blocking_size
820 << " REMAING_PARTICLE " << remaining_particle
821 << " (Date=" << platform::getCurrentDateTime() << ")";
822
823 // Comme il peut y avoir des particules en attente, il faut
824 // maintenant les envoyer
825 if (m_waiting_local_ids.size()>0)
826 _generateSendItemsMessages(Int32ConstArrayView(),Int32ConstArrayView());
827 for( Int32 i=0; i<nb_rank; ++i ){
828 if (i==m_master_proc)
829 continue;
830 SerializeMessage* sm = new SerializeMessage(m_rank,i,ISerializeMessage::MT_Send);
831 ISerializer* sbuf = sm->serializer();
832 sbuf->setMode(ISerializer::ModeReserve);
833 sbuf->reserveInteger(1);
834 sbuf->reserveInteger(1);
835 sbuf->allocateBuffer();
836 sbuf->setMode(ISerializer::ModePut);
837 sbuf->putInteger(MESSAGE_CHANGE_BLOCKING);
838 sbuf->putInteger(m_nb_blocking_size);
839 m_pending_messages.add(sm);
840 }
841 }
842}
843
844
845/*---------------------------------------------------------------------------*/
846/*---------------------------------------------------------------------------*/
847
848void NonBlockingParticleExchanger::
849_checkInitialized()
850{
851 if (!m_item_family)
852 ARCANE_FATAL("method initialized() not called");
853 _clearMessages();
854}
855
856/*---------------------------------------------------------------------------*/
857/*---------------------------------------------------------------------------*/
858
859ARCANE_REGISTER_CASE_OPTIONS_NOAXL_FACTORY(NonBlockingParticleExchanger,IParticleExchanger,
860 NonBlockingParticleExchanger);
861
862/*---------------------------------------------------------------------------*/
863/*---------------------------------------------------------------------------*/
864
865} // End namespace Arcane::mesh
866
867/*---------------------------------------------------------------------------*/
868/*---------------------------------------------------------------------------*/
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
#define ARCANE_REGISTER_CASE_OPTIONS_NOAXL_FACTORY(aclass, ainterface, aname)
Enregistre un service de fabrique pour la classe aclass.
void clear()
Supprime les éléments du tableau.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
Interface d'une famille d'entités.
Definition IItemFamily.h:84
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual char reduce(eReduceType rt, char v)=0
Effectue la réduction de type rt sur le réel v et retourne la valeur.
Groupe d'entités de maillage.
Definition ItemGroup.h:49
Int32 size() const
Nombre d'éléments du vecteur.
TraceMessage info() const
Flot pour un message d'information.
VariableList m_variables_to_exchange
Liste des variables à échanger.
Integer m_nb_blocking_size
Nombre de particules restantes avant de passer en mode bloquant.
ItemVectorViewT< Particle > ParticleVectorView
Vue sur un vecteur de particules.
Definition ItemTypes.h:309
@ ReduceSum
Somme des valeurs.
ARCCORE_BASE_EXPORT String getCurrentDateTime()
Date et l'heure courante sous la forme ISO 8601.
UniqueArray< Int64 > Int64UniqueArray
Tableau dynamique à une dimension d'entiers 64 bits.
Definition UtilsTypes.h:426
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:569
UniqueArray< Int32 > Int32UniqueArray
Tableau dynamique à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:428
Array< Int32 > Int32Array
Tableau dynamique à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:214