Arcane  v3.15.0.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
NonBlockingParticleExchanger.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2024 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* NonBlockingParticleExchanger.cc (C) 2000-2024 */
9/* */
10/* Echangeur de particules. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/mesh/NonBlockingParticleExchanger.h"
15
16#include "arcane/utils/List.h"
17#include "arcane/utils/FatalErrorException.h"
18#include "arcane/utils/IFunctor.h"
19#include "arcane/utils/PlatformUtils.h"
20
21#include "arcane/core/ItemGroup.h"
22#include "arcane/core/ItemVector.h"
23#include "arcane/core/IItemFamily.h"
24#include "arcane/core/IParticleFamily.h"
25#include "arcane/core/IParallelMng.h"
26#include "arcane/core/IVariableMng.h"
27#include "arcane/core/IVariable.h"
28#include "arcane/core/IMesh.h"
29#include "arcane/core/Item.h"
30#include "arcane/core/Timer.h"
31#include "arcane/core/SerializeMessage.h"
32#include "arcane/core/ISerializeMessageList.h"
33#include "arcane/core/CommonVariables.h"
34#include "arcane/core/FactoryService.h"
35
36//#define ARCANE_DEBUG_EXCHANGE_ITEMS
37
38/*---------------------------------------------------------------------------*/
39/*---------------------------------------------------------------------------*/
40
41namespace Arcane::mesh
42{
43
44/*---------------------------------------------------------------------------*/
45/*---------------------------------------------------------------------------*/
46
47NonBlockingParticleExchanger::
48NonBlockingParticleExchanger(const ServiceBuildInfo& sbi)
49: BasicService(sbi)
50, m_item_family(nullptr)
51, m_parallel_mng(sbi.mesh()->parallelMng())
52, m_rank(m_parallel_mng->commRank())
53, m_timer(new Timer(m_parallel_mng->timerMng(),"NonBlockingParticleExchanger",Timer::TimerReal))
54, m_total_time_functor(0.)
55, m_total_time_waiting(0.)
56, m_nb_total_particle_finish_exchange(0)
57, m_nb_total_particle(0)
58, m_nb_original_blocking_size(0)
59, m_nb_blocking_size(m_nb_original_blocking_size)
60, m_exchange_finished(true)
61, m_master_proc(0)
62, m_need_general_receive(false)
63, m_end_message_sended(false)
64, m_can_process_messages(true)
65, m_can_process_non_blocking(false)
66, m_want_process_non_blocking(false)
67, m_want_fast_send_particles(true)
68, m_nb_receive_message(0)
69, m_nb_particle_finished_exchange(0)
70, m_verbose_level(1)
71, m_is_debug(false)
72{
73 // m_want_fast_send_particles permet d'envoyer en même temps que
74 // les particules le nombre qui ont finie la poursuite. Cela permet
75 // d'éviter des messages supplémentaires.
76#if ARCANE_DEBUG_EXCHANGE_ITEMS
77 m_is_debug = true;
78#endif
79}
80
81/*---------------------------------------------------------------------------*/
82/*---------------------------------------------------------------------------*/
83
84NonBlockingParticleExchanger::
85~NonBlockingParticleExchanger()
86{
87 // On ne peut pas faire de throw dans le destructeur donc on affiche
88 // des avertissements de compilation
89 if (!m_pending_messages.empty() || !m_waiting_messages.empty()){
90 String s = String::format("pending or waiting messages: nb_pending={0} nb_waiting=",
91 m_pending_messages.size(),m_waiting_messages.size());
92 warning() << s;
93 }
94
95 if (!m_waiting_local_ids.empty()){
96 warning() << String::format("pending particles: nb_pending=",m_waiting_local_ids.size());
97 }
98
99 delete m_timer;
100}
101
102/*---------------------------------------------------------------------------*/
103/*---------------------------------------------------------------------------*/
104
105void NonBlockingParticleExchanger::
106initialize(IItemFamily* item_family)
107{
108 m_item_family = item_family;
109}
110
111/*---------------------------------------------------------------------------*/
112/*---------------------------------------------------------------------------*/
113
114void NonBlockingParticleExchanger::
115_clearMessages()
116{
117 for( Integer i=0, is=m_accumulate_infos.size(); i<is; ++i ){
118 delete m_accumulate_infos[i];
119 m_accumulate_infos[i] = 0;
120 }
121 m_accumulate_infos.clear();
122}
123
124/*---------------------------------------------------------------------------*/
125/*---------------------------------------------------------------------------*/
126
127void NonBlockingParticleExchanger::
128beginNewExchange(Integer i_nb_particle)
129{
130 _checkInitialized();
131 IParallelMng* pm = m_parallel_mng;
132
133 m_end_message_sended = false;
134 m_exchange_finished = false;
135 m_nb_blocking_size = m_nb_original_blocking_size;
136 m_nb_receive_message = 0;
137 m_nb_particle_finished_exchange = 0;
138
139 //TODO: utiliser un tag spécifique pour cet échange.
141 m_nb_total_particle = pm->reduce(Parallel::ReduceSum,nb_particle);
142 info() << "BEGIN TRACKING TOTAL FLYING = " << m_nb_total_particle
143 << " (local=" << nb_particle << ") "
144 << " (Date=" << platform::getCurrentDateTime() << ")";
145
146 m_nb_total_particle_finish_exchange = 0;
147
148 m_need_general_receive = true;
149
150 // Récupère la liste des variables à transferer.
151 // Il s'agit des variables qui ont la même famille que celle passée
152 // en paramètre.
153 // IMPORTANT: tous les sous-domaines doivent avoir ces mêmes variables
154 m_variables_to_exchange.clear();
155 m_item_family->usedVariables(m_variables_to_exchange);
156 m_variables_to_exchange.sortByName(true);
157}
158
159/*---------------------------------------------------------------------------*/
160/*---------------------------------------------------------------------------*/
161
162bool NonBlockingParticleExchanger::
163exchangeItems(Integer nb_particle_finish_exchange,
166 IFunctor* functor)
167{
168 //TODO a supprimer si passe par sendItems()
169 m_nb_particle_finished_exchange += nb_particle_finish_exchange;
170 return _exchangeItems(local_ids,sub_domains_to_send,item_group,0,functor);
171}
172
173/*---------------------------------------------------------------------------*/
174/*---------------------------------------------------------------------------*/
175
176bool NonBlockingParticleExchanger::
177exchangeItems(Integer nb_particle_finish_exchange,
181 IFunctor* functor)
182{
183 //TODO a supprimer si passe par sendItems()
184 m_nb_particle_finished_exchange += nb_particle_finish_exchange;
185 return _exchangeItems(local_ids,sub_domains_to_send,ItemGroup(),new_particle_local_ids,functor);
186}
187
188/*---------------------------------------------------------------------------*/
189/*---------------------------------------------------------------------------*/
190
191void NonBlockingParticleExchanger::
192sendItems(Integer nb_particle_finish_exchange,
195{
196 m_nb_particle_finished_exchange += nb_particle_finish_exchange;
197 _checkSendItems(local_ids,sub_domains_to_send);
198 _sendPendingMessages();
199}
200
201/*---------------------------------------------------------------------------*/
202/*---------------------------------------------------------------------------*/
203
204bool NonBlockingParticleExchanger::
206{
207 return _waitMessages(nb_pending_particle,ItemGroup(),new_particle_local_ids,functor);
208}
209
210/*---------------------------------------------------------------------------*/
211/*---------------------------------------------------------------------------*/
212
213void NonBlockingParticleExchanger::
214_checkSendItems(Int32ConstArrayView local_ids,
216{
217 Integer nb_particle = local_ids.size();
218 Integer nb_waiting_local_ids = m_waiting_local_ids.size();
219 Integer nb_waiting_sub_domains_to_send = m_waiting_sub_domains_to_send.size();
220 if ((nb_particle+nb_waiting_local_ids)>=m_nb_blocking_size){
221 _generateSendItemsMessages(local_ids,sub_domains_to_send);
222 }
223 else{
224 // Met les particules dans un tampon avant de les envoyer
225 m_waiting_local_ids.resize(nb_waiting_local_ids+nb_particle);
226 m_waiting_sub_domains_to_send.resize(nb_waiting_sub_domains_to_send+nb_particle);
227 for( Integer i=0; i<nb_particle; ++i ){
228 m_waiting_local_ids[nb_waiting_local_ids+i] = local_ids[i];
229 m_waiting_sub_domains_to_send[nb_waiting_sub_domains_to_send+i] = sub_domains_to_send[i];
230 }
231 }
232}
233
234/*---------------------------------------------------------------------------*/
235/*---------------------------------------------------------------------------*/
236
237bool NonBlockingParticleExchanger::
238_exchangeItems(Int32ConstArrayView local_ids,
239 Int32ConstArrayView sub_domains_to_send,ItemGroup item_group,
240 Int32Array* new_particle_local_ids,
241 IFunctor* functor)
242{
243 if (m_want_fast_send_particles){
244 if (local_ids.empty())
245 _processFinishTrackingMessage();
246 }
247 else
248 if (!m_want_process_non_blocking)
249 _processFinishTrackingMessage();
250 if (m_exchange_finished){
251 _sendFinishExchangeParticle();
252 m_need_general_receive = false;
253 }
254 _checkNeedReceiveMessage();
255
256 _checkSendItems(local_ids,sub_domains_to_send);
257
258 return _waitMessages(0,item_group,new_particle_local_ids,functor);
259}
260
261/*---------------------------------------------------------------------------*/
262/*---------------------------------------------------------------------------*/
263
264bool NonBlockingParticleExchanger::
265_waitMessages(Integer nb_pending_particle,ItemGroup item_group,
266 Int32Array* new_particle_local_ids,IFunctor* functor)
267{
268 ARCANE_UNUSED(nb_pending_particle);
269
270 if (!item_group.null())
271 item_group.clear();
272
273 m_can_process_messages = true;
274 m_can_process_non_blocking = m_want_process_non_blocking;
275 while (m_can_process_messages && !m_exchange_finished){
276 m_can_process_messages = false;
277 _processMessages(item_group,new_particle_local_ids,false,functor);
278 }
279
280 if (m_exchange_finished){
281 info(5) << " ** EXCHANGE finished: ";
282 // Cela est nécessaire pour être certain que les messages
283 // de fin sont bien tous réceptionnés
284 _processMessages(item_group,new_particle_local_ids,true,0);
285 info(5) << " ** EXCHANGE finished END: ";
286 }
287
288 info(5) << " ** RETURN EXCHANGE m_exchange_finished: " << m_exchange_finished;
289 return m_exchange_finished;
290}
291
292/*---------------------------------------------------------------------------*/
293/*---------------------------------------------------------------------------*/
294
295void NonBlockingParticleExchanger::
296_checkNeedReceiveMessage()
297{
298 if (m_need_general_receive){
299 auto sm = new SerializeMessage(m_rank,A_NULL_RANK,ISerializeMessage::MT_Recv);
300 m_pending_messages.add(sm);
301 m_need_general_receive = false;
302 }
303}
304
305/*---------------------------------------------------------------------------*/
306/*---------------------------------------------------------------------------*/
307
308void NonBlockingParticleExchanger::
309_generateSendItemsMessages(Int32ConstArrayView local_ids,
310 Int32ConstArrayView sub_domains_to_send)
311{
312 Timer::Phase tphase(m_parallel_mng->timeStats(),TP_Communication);
313
314 IMesh* mesh = m_item_family->mesh();
315
316 Int32UniqueArray communicating_sub_domains;
317 mesh->cellFamily()->getCommunicatingSubDomains(communicating_sub_domains);
318
319 Integer nb_connected_sub_domain = communicating_sub_domains.size();
320 //Integer max_sub_domain_id = 0;
321 UniqueArray< SharedArray<Int32> > ids_to_send(nb_connected_sub_domain);
322 // Infos pour chaque sous-domaine connecté
323 //_clearMessages();
324 m_accumulate_infos.clear();
325 m_accumulate_infos.resize(nb_connected_sub_domain);
326 for( Integer i=0; i<nb_connected_sub_domain; ++i )
327 m_accumulate_infos[i] = new SerializeMessage(m_rank,communicating_sub_domains[i],
328 ISerializeMessage::MT_Send);
329
330 _addItemsToSend(local_ids,sub_domains_to_send,communicating_sub_domains,ids_to_send);
331 _addItemsToSend(m_waiting_local_ids,m_waiting_sub_domains_to_send,
332 communicating_sub_domains,ids_to_send);
333
334
335 if (m_is_debug){
336 info() << "-- Subdomain " << m_rank << ". NB to send: " << local_ids.size()
337 << " NB connected subdomains: " << nb_connected_sub_domain;
338
339 info() << "NB connected subdomain for " << m_rank << " : " << m_accumulate_infos.size();
340 for( Integer i=0, n=m_accumulate_infos.size(); i<n; ++i ){
341 info() << "------------- Send: rank=" << m_accumulate_infos[i]->destRank()
342 << " n=" << ids_to_send[i].size();
343 }
344 }
345
346 Int64UniqueArray items_to_send_uid;
347 Int64UniqueArray items_to_send_cells_uid; // Uniquement pour les particules;
348
349 for( Integer j=0; j<nb_connected_sub_domain; ++j ){
350 ISerializeMessage* sm = m_accumulate_infos[j];
351 // En mode bloquant, envoie toujours le message car le destinataire a posté
352 // un message de réception. Sinon, le message n'a besoin d'être envoyé que
353 // s'il contient des particules.
354 if (!ids_to_send[j].empty())
355 _serializeMessage(sm,ids_to_send[j],items_to_send_uid,
356 items_to_send_cells_uid);
357 else
358 // Le message n'est pas utile car vide.
359 delete sm;
360 }
361
362 m_accumulate_infos.clear();
363
364 // Détruit les entités qui viennent d'être envoyées
365 info(5)<<"NonBlockingParticleExchanger:: sendItems " << "local_ids "<<local_ids.size();
366 info(5)<<"NonBlockingParticleExchanger:: sendItems " << "m_waiting_local_ids "<<m_waiting_local_ids.size();
367
368 m_item_family->toParticleFamily()->removeParticles(local_ids);
369 m_item_family->toParticleFamily()->removeParticles(m_waiting_local_ids);
370 m_item_family->endUpdate();
371 m_waiting_local_ids.clear();
372 m_waiting_sub_domains_to_send.clear();
373}
374
375/*---------------------------------------------------------------------------*/
376/*---------------------------------------------------------------------------*/
377
378void NonBlockingParticleExchanger::
379_addItemsToSend(Int32ConstArrayView local_ids,
380 Int32ConstArrayView sub_domains_to_send,
381 Int32ConstArrayView communicating_sub_domains,
382 UniqueArray<SharedArray<Int32> >& ids_to_send)
383{
384 String func_name("NonBlockingParticleExchanger::_addItemsToSend()");
385 Integer nb_connected_sub_domain = ids_to_send.size();
386 // Cherche pour chaque élément à quel sous-domaine il doit être transféré.
387 // Cette recherche se fait en se basant sur les \a local_ids
388 Integer id_size = local_ids.size();
389 for( Integer i=0; i<id_size; ++i ){
390 Int32 item_local_id = local_ids[i];
391 Integer sd_to_send = sub_domains_to_send[i];
392#ifdef ARCANE_CHECK
393 if (sd_to_send==m_rank)
394 // Il s'agit d'une entité propre à ce sous-domaine
395 fatal() << func_name << "The entity with local id " << item_local_id
396 << " should not be sent to its own subdomain";
397#endif
398 // Recherche l'index du sous-domaine auquel l'entité appartient
399 // dans la liste \a sync_list
400 // TODO: utiliser une table indirect (tableau alloué au nombre de sous-domaines)
401 Integer sd_index = nb_connected_sub_domain;
402 for( Integer i_sd=0; i_sd<nb_connected_sub_domain; ++i_sd )
403 if (sd_to_send==communicating_sub_domains[i_sd]){
404 sd_index = i_sd;
405 break;
406 }
407#ifdef ARCANE_CHECK
408 if (sd_index==nb_connected_sub_domain)
409 fatal() << func_name << "Internal: bad subdomain index";
410#endif
411 ids_to_send[sd_index].add(item_local_id);
412 }
413}
414
415/*---------------------------------------------------------------------------*/
416/*---------------------------------------------------------------------------*/
417
418void NonBlockingParticleExchanger::
419_processMessages(ItemGroup item_group,Int32Array* new_particle_local_ids,
420 bool wait_all,IFunctor* functor)
421{
422 _sendPendingMessages();
423
424 if (functor){
425 {
426 Timer::Sentry ts(m_timer);
427 functor->executeFunctor();
428 }
429 m_total_time_functor += m_timer->lastActivationTime();
430 info(5) << "TimeFunctor: current=" << m_timer->lastActivationTime()
431 << " total=" << m_total_time_functor;
432 }
433
434 Integer nb_message_finished = 0;
435 {
436 Timer::Sentry ts(m_timer);
437 if (wait_all)
438 nb_message_finished = m_message_list->waitMessages(Parallel::WaitAll);
439 else{
440 if (m_can_process_non_blocking)
441 nb_message_finished = m_message_list->waitMessages(Parallel::WaitSomeNonBlocking);
442 else
443 nb_message_finished = m_message_list->waitMessages(Parallel::WaitSome);
444 //info() << "Nb finished=" << nb_message_finished << " is_block=" << m_can_process_non_blocking;
445 if (nb_message_finished==0){
446 m_can_process_non_blocking = false;
447 m_can_process_messages = true;
448 _processFinishTrackingMessage();
449 return;
450 }
451 }
452 }
453 m_total_time_waiting += m_timer->lastActivationTime();
454 info(5) << "TimeWaiting: current=" << m_timer->lastActivationTime()
455 << " total=" << m_total_time_waiting;
456
457 // Sauve les communications actuellement traitées car le traitement
458 // peut en ajouter de nouvelles
459 UniqueArray<ISerializeMessage*> current_messages(m_waiting_messages);
460 m_waiting_messages.clear();
461
462 Int64UniqueArray items_to_create_id;
463 Int64UniqueArray items_to_create_cells_id;
464 for( Integer i=0, is=current_messages.size(); i<is; ++i ){
465 ISerializeMessage* sm = current_messages[i];
466 if (sm->finished()){
467 if (!sm->isSend()){
468 _deserializeMessage(sm,items_to_create_id,items_to_create_cells_id,item_group,new_particle_local_ids);
469 ++m_nb_receive_message;
470 }
471 delete sm;
472 }
473 else
474 m_waiting_messages.add(sm);
475 }
476}
477
478/*---------------------------------------------------------------------------*/
479/*---------------------------------------------------------------------------*/
480
481void NonBlockingParticleExchanger::
482_sendPendingMessages()
483{
484 IParallelMng* pm = m_parallel_mng;
485
486 _checkNeedReceiveMessage();
487
488 if (!m_message_list.get())
489 m_message_list = pm->createSerializeMessageListRef();
490
491 {
492 Timer::Sentry ts(m_timer);
493 // Ajoute les messages en attente de traitement
494 Integer nb_message = m_pending_messages.size();
495 for( Integer i=0; i<nb_message; ++i ){
496 m_message_list->addMessage(m_pending_messages[i]);
497 m_waiting_messages.add(m_pending_messages[i]);
498 }
499 m_message_list->processPendingMessages();
500 m_pending_messages.clear();
501 }
502 info(5) << "TimeSendMessages=" << m_timer->lastActivationTime()
503 << " buffersize=" << m_waiting_local_ids.size();
504}
505
506/*---------------------------------------------------------------------------*/
507/*---------------------------------------------------------------------------*/
508
509void NonBlockingParticleExchanger::
510_serializeMessage(ISerializeMessage* sm,
511 Int32ConstArrayView acc_ids,
512 Int64Array& items_to_send_uid,
513 Int64Array& items_to_send_cells_uid)
514{
515 ParticleInfoListView internal_items(m_item_family);
516
517 ISerializer* sbuf = sm->serializer();
518 sbuf->setMode(ISerializer::ModeReserve);
519
520 //for( Integer j=0; j<nb_connected_sub_domain; ++j ){
521 //ConstArrayView<Integer> acc_ids = m_ids_to_send[j];
522 Integer nb_item = acc_ids.size();
523 // Réserve pour le type de message
524 sbuf->reserveInteger(1);
525 if (m_want_fast_send_particles){
526 // Réserve pour le nombre de particules traitées
527 sbuf->reserveInt64(1);
528 }
529 // Réserve pour le rang de l'expéditeur
530 sbuf->reserveInt32(1);
531 // Réserve pour le nombre de uniqueId()
532 sbuf->reserveInt64(1);
533 // Réserve pour les uniqueId() des particules
534 sbuf->reserveSpan(eBasicDataType::Int64,nb_item);
535 // Réserve pour les uniqueId() des mailles dans lesquelles se trouvent les particules
536 sbuf->reserveSpan(eBasicDataType::Int64,nb_item);
537
538 for( VariableList::Enumerator i_var(m_variables_to_exchange); ++i_var; ){
539 IVariable* var = *i_var;
540 var->serialize(sbuf,acc_ids);
541 }
542
543 // Sérialise les données en écriture
544 sbuf->allocateBuffer();
545 sbuf->setMode(ISerializer::ModePut);
546
547 sbuf->putInteger(MESSAGE_EXCHANGE);
548 if (m_want_fast_send_particles){
549 sbuf->putInt64(m_nb_particle_finished_exchange);
550 m_nb_particle_finished_exchange = 0;
551 }
552 sbuf->putInt32(m_rank);
553 sbuf->putInt64(nb_item);
554 items_to_send_uid.resize(nb_item);
555 items_to_send_cells_uid.resize(nb_item);
556
557 for( Integer z=0; z<nb_item; ++z ){
558 Particle item = internal_items[acc_ids[z]];
559 items_to_send_uid[z] = item.uniqueId();
560 bool has_cell = item.hasCell();
561 items_to_send_cells_uid[z] = (has_cell) ? item.cell().uniqueId() : NULL_ITEM_UNIQUE_ID;
562#ifdef ARCANE_DEBUG_EXCHANGE_ITEMS
563#if 0
564 info() << "Particle BufID=" << acc_ids[z]
565 << " LID=" << item.localId()
566 << " UID=" << items_to_send_uid[z]
567 << " CellIUID=" << items_to_send_cells_uid[z]
568 << " (owner=" << item.cell().owner() << ")";
569#endif
570#endif
571 }
572 sbuf->putSpan(items_to_send_uid);
573 sbuf->putSpan(items_to_send_cells_uid);
574
575 for( VariableList::Enumerator i_var(m_variables_to_exchange); ++i_var; ){
576 IVariable* var = *i_var;
577 var->serialize(sbuf,acc_ids);
578 }
579
580 m_pending_messages.add(sm);
581}
582
583/*---------------------------------------------------------------------------*/
584/*---------------------------------------------------------------------------*/
585
586void NonBlockingParticleExchanger::
587_deserializeMessage(ISerializeMessage* message,
588 Int64Array& items_to_create_unique_id,
589 Int64Array& items_to_create_cells_id,
590 ItemGroup item_group,
591 Int32Array* new_particle_local_ids)
592{
593
594 IMesh* mesh = m_item_family->mesh();
595 ISerializer* sbuf = message->serializer();
596
597 // Indique qu'on souhaite sérialiser les données en lecture
598 sbuf->setMode(ISerializer::ModeGet);
599 sbuf->setReadMode(ISerializer::ReadReplace);
600 Int32UniqueArray items_to_create_local_id;
601 Int32UniqueArray cells_lid;
602
603 Integer message_type = sbuf->getInteger();
604 info(4) << "Deserialise message_type=" << (int)message_type;
605 switch(message_type){
606 case MESSAGE_EXCHANGE:
607 {
608 m_need_general_receive = true;
609 if (m_want_fast_send_particles){
610 Int64 nb_finished = sbuf->getInt64();
611 m_nb_particle_finished_exchange += nb_finished;
612 }
613 Int32 orig_rank = sbuf->getInt32();
614 Int64 nb_item = sbuf->getInt64();
615 if (m_is_debug)
616 info() << "------------- Receive: rank=" << orig_rank << " particle nb=" << nb_item
617 << " (orig_rank=" << message->destination() << ")";
618
619 //if (nb_item!=0)
620 //info() << "Receiving particules n=" << nb_item;
621 items_to_create_local_id.resize(nb_item);
622 items_to_create_unique_id.resize(nb_item);
623 items_to_create_cells_id.resize(nb_item);
624 sbuf->getSpan(items_to_create_unique_id);
625 sbuf->getSpan(items_to_create_cells_id);
626#ifdef ARCANE_DEBUG_EXCHANGE_ITEMS
627 //info() << "Recv from SID " << sync_infos[i].subDomain() << " N=" << nb_item;
628#if 0
629 for( Integer z=0; z<nb_item; ++z ){
630 info() << "Particle UID=" << items_to_create_unique_id[z]
631 << " CellIUID=" << items_to_create_cells_id[z];
632 }
633#endif
634#endif
635 cells_lid.resize(nb_item);
636 mesh->cellFamily()->itemsUniqueIdToLocalId(cells_lid,items_to_create_cells_id);
637
638 items_to_create_local_id.resize(nb_item);
639 ParticleVectorView particles_view = m_item_family->toParticleFamily()->addParticles(items_to_create_unique_id,
640 cells_lid,
641 items_to_create_local_id);
642 info(5) << "Nb create=" << particles_view.size();
643
644 // Notifie la famille qu'on a fini nos modifs.
645 // Après appel à cette méthode, les variables sont à nouveau utilisables
646 m_item_family->endUpdate();
647
648 // Converti les uniqueId() récupérée en localId() et pour les particules
649 // renseigne la maille correspondante
650 ParticleInfoListView internal_items(m_item_family);
651 //ItemInternalList internal_cells(mesh->itemsInternal(IK_Cell));
652 //m_item_family->itemsUniqueIdToLocalId(items_to_create_local_id,items_to_create_unique_id);
653
654 for( Integer z=0; z<nb_item; ++z ){
655 Particle item = internal_items[items_to_create_local_id[z]];
656 //item.setCell( internal_cells[cells_lid[z]] );
657 // Je suis le nouveau propriétaire (TODO: ne pas faire ici)
658 item.mutableItemBase().setOwner(m_rank,m_rank);
659 }
660 if (!item_group.null())
661 item_group.addItems(items_to_create_local_id,false);
662 if (new_particle_local_ids)
663 new_particle_local_ids->addRange(items_to_create_local_id);
664
665 for( VariableCollection::Enumerator i_var(m_variables_to_exchange); ++i_var; ){
666 IVariable* var = *i_var;
667 var->serialize(sbuf,items_to_create_local_id);
668 }
669 }
670 break;
671 case MESSAGE_NB_FINISH_EXCHANGE:
672 {
673 m_need_general_receive = true;
674 // Indique qu'on peut continuer à recevoir des messages, celui-ci n'étant pas
675 // significatif
676 m_can_process_messages = true;
677 Int64 nb_particle = sbuf->getInt64();
678 Int32 orig_rank = sbuf->getInt32();
679 if (m_is_debug)
680 info() << "MESSAGE_NB_FINISH_EXCHANGE nb=" << nb_particle << " (from rank=" << orig_rank << ")";
681 _addFinishExchangeParticle(nb_particle);
682 }
683 break;
684 case MESSAGE_FINISH_EXCHANGE_STATUS:
685 {
686 m_nb_total_particle_finish_exchange = sbuf->getInt64();
687 m_exchange_finished = (m_nb_total_particle_finish_exchange==m_nb_total_particle);
688 //#ifdef ARCANE_DEBUG_EXCHANGE_ITEMS
689 info() << "** RECEIVING FINISH EXCHANGE " << m_exchange_finished
690 << " finish=" << m_nb_total_particle_finish_exchange
691 << " total=" << m_nb_total_particle;
692 //#endif
693 //if (m_exchange_finished)
694 //warning() << "Exchange finished ! " << m_current_iteration;
695 }
696 break;
697 case MESSAGE_CHANGE_BLOCKING:
698 {
699 m_need_general_receive = true;
700
701 Integer nb_blocking_size = sbuf->getInteger();
702 // Il faut être certain que le nouveau \a blocking_size
703 // est inférieur au courant, ce qui peut arriver lorsqu'on
704 // recoit plusieurs messages de ce type en même temps.
705 if (nb_blocking_size<m_nb_blocking_size)
706 m_nb_blocking_size = nb_blocking_size;
707 info(4) << "** RECEIVING CHANGE BLOCKING"
708 << " new_blocking_size=" << m_nb_blocking_size;
709 // Comme il peut y avoir des particules en attente, il faut
710 // maintenant les envoyer
711 if (m_waiting_local_ids.size()>0)
712 _generateSendItemsMessages(Int32ConstArrayView(),Int32ConstArrayView());
713 }
714 break;
715 }
716}
717
718/*---------------------------------------------------------------------------*/
719/*---------------------------------------------------------------------------*/
720
721void NonBlockingParticleExchanger::
722reset()
723{
724 if (!m_waiting_messages.empty())
725 ARCANE_FATAL("reset() waiting parallel requests");
726 _clearMessages();
727}
728
729/*---------------------------------------------------------------------------*/
730/*---------------------------------------------------------------------------*/
731
732void NonBlockingParticleExchanger::
733_processFinishTrackingMessage()
734{
735 // Si processeur == m_master_proc,
736 // - réceptionne des autres leur nombre fini.
737 // - fait le total
738 // - le renvoi à tout le monde
739 // Si processeur != m_master_proc
740 // - envoie à m_master_proc la valeur \a nb_finish_tracking_particle
741 // - réceptionne de m_master_proc le nombre total ayant terminé
742 if (m_rank==m_master_proc){
743 _addFinishExchangeParticle(m_nb_particle_finished_exchange);
744 }
745 else{
746 // Envoie le nombre de particules au processeur \a master_proc
747 if (m_nb_particle_finished_exchange!=0) {
748 info(4) << "Send to master proc (" << m_master_proc << ") nb_finish=" << m_nb_particle_finished_exchange;
749 SerializeMessage* sm = new SerializeMessage(m_rank,m_master_proc,ISerializeMessage::MT_Send);
750 ISerializer* sbuf = sm->serializer();
751 sbuf->setMode(ISerializer::ModeReserve);
752 sbuf->reserveInteger(1);
753 sbuf->reserveInt64(1);
754 sbuf->reserveInt32(1);
755 sbuf->allocateBuffer();
756 sbuf->setMode(ISerializer::ModePut);
757 sbuf->putInteger(MESSAGE_NB_FINISH_EXCHANGE);
758 sbuf->putInt64(m_nb_particle_finished_exchange);
759 sbuf->putInt32(m_rank);
760 m_pending_messages.add(sm);
761 }
762 }
763 m_nb_particle_finished_exchange = 0;
764}
765
766/*---------------------------------------------------------------------------*/
767/*---------------------------------------------------------------------------*/
768
769void NonBlockingParticleExchanger::
770_sendFinishExchangeParticle()
771{
772 Int32 nb_rank = m_parallel_mng->commSize();
773 if (m_rank!=m_master_proc || m_end_message_sended)
774 return;
775 m_end_message_sended = true;
776 info(4) << " ** ** SEND FINISH EXCHANGE PARTICLE2";
777 for( Integer i=0; i<nb_rank; ++i ){
778 if (i==m_master_proc)
779 continue;
780 SerializeMessage* sm = new SerializeMessage(m_rank,i,ISerializeMessage::MT_Send);
781 ISerializer* sbuf = sm->serializer();
782 sbuf->setMode(ISerializer::ModeReserve);
783 sbuf->reserveInteger(1);
784 sbuf->reserveInt64(1);
785 sbuf->allocateBuffer();
786 sbuf->setMode(ISerializer::ModePut);
787 sbuf->putInteger(MESSAGE_FINISH_EXCHANGE_STATUS);
788 sbuf->putInt64(m_nb_total_particle_finish_exchange);
789 m_pending_messages.add(sm);
790 }
791}
792
793/*---------------------------------------------------------------------------*/
794/*---------------------------------------------------------------------------*/
795
796void NonBlockingParticleExchanger::
797_addFinishExchangeParticle(Int64 nb_particle_finish_exchange)
798{
799 m_nb_total_particle_finish_exchange += nb_particle_finish_exchange;
800 Int32 nb_rank = m_parallel_mng->commSize();
801 Int64 nb_rank_as_int64 = nb_rank;
802 //#ifdef ARCANE_DEBUG_EXCHANGE_ITEMS
803 info(4) << "** RECEIVING FINISH EXCHANGE n=" << nb_particle_finish_exchange
804 << " totalfinish=" << m_nb_total_particle_finish_exchange
805 << " total=" << m_nb_total_particle;
806 //#endif
807 Int64 remaining_particle = m_nb_total_particle - m_nb_total_particle_finish_exchange;
808 if (remaining_particle==0){
809 m_exchange_finished = true;
810 m_need_general_receive = false;
811 info() << "** ** FINISH TRACKING NB_RECV=" << m_nb_receive_message
812 << " (Date=" << platform::getCurrentDateTime() << ")";
813 _sendFinishExchangeParticle();
814 }
815 else if (remaining_particle<(m_nb_blocking_size*nb_rank_as_int64)){
816 //Integer nb_rank = subDomain()->nbSubDomain();
817 //m_nb_blocking_size /= 100;
818 m_nb_blocking_size = 0;
819 warning() << "** ** CHANGE BLOCKING NEW_SIZE " << m_nb_blocking_size
820 << " REMAING_PARTICLE " << remaining_particle
821 << " (Date=" << platform::getCurrentDateTime() << ")";
822
823 // Comme il peut y avoir des particules en attente, il faut
824 // maintenant les envoyer
825 if (m_waiting_local_ids.size()>0)
826 _generateSendItemsMessages(Int32ConstArrayView(),Int32ConstArrayView());
827 for( Int32 i=0; i<nb_rank; ++i ){
828 if (i==m_master_proc)
829 continue;
830 SerializeMessage* sm = new SerializeMessage(m_rank,i,ISerializeMessage::MT_Send);
831 ISerializer* sbuf = sm->serializer();
832 sbuf->setMode(ISerializer::ModeReserve);
833 sbuf->reserveInteger(1);
834 sbuf->reserveInteger(1);
835 sbuf->allocateBuffer();
836 sbuf->setMode(ISerializer::ModePut);
837 sbuf->putInteger(MESSAGE_CHANGE_BLOCKING);
838 sbuf->putInteger(m_nb_blocking_size);
839 m_pending_messages.add(sm);
840 }
841 }
842}
843
844
845/*---------------------------------------------------------------------------*/
846/*---------------------------------------------------------------------------*/
847
848void NonBlockingParticleExchanger::
849_checkInitialized()
850{
851 if (!m_item_family)
852 ARCANE_FATAL("method initialized() not called");
853 _clearMessages();
854}
855
856/*---------------------------------------------------------------------------*/
857/*---------------------------------------------------------------------------*/
858
859ARCANE_REGISTER_CASE_OPTIONS_NOAXL_FACTORY(NonBlockingParticleExchanger,IParticleExchanger,
860 NonBlockingParticleExchanger);
861
862/*---------------------------------------------------------------------------*/
863/*---------------------------------------------------------------------------*/
864
865} // End namespace Arcane::mesh
866
867/*---------------------------------------------------------------------------*/
868/*---------------------------------------------------------------------------*/
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
#define ARCANE_REGISTER_CASE_OPTIONS_NOAXL_FACTORY(aclass, ainterface, aname)
Enregistre un service de fabrique pour la classe aclass.
Tableau d'items de types quelconques.
Interface d'une famille d'entités.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual Ref< ISerializeMessageList > createSerializeMessageListRef()=0
Créé une liste pour gérer les 'ISerializeMessage'.
virtual char reduce(eReduceType rt, char v)=0
Effectue la réduction de type rt sur le réel v et retourne la valeur.
Groupe d'entités de maillage.
Definition ItemGroup.h:49
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Definition Lima.cc:149
Vue constante d'un tableau de type T.
ItemVectorViewT< Particle > ParticleVectorView
Vue sur un vecteur de particules.
Definition ItemTypes.h:309
UniqueArray< Int64 > Int64UniqueArray
Tableau dynamique à une dimension d'entiers 64 bits.
Definition UtilsTypes.h:550
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:693
UniqueArray< Int32 > Int32UniqueArray
Tableau dynamique à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:552
Int32 Integer
Type représentant un entier.