Arcane  v3.14.10.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
NonBlockingParticleExchanger.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2023 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* NonBlockingParticleExchanger.cc (C) 2000-2023 */
9/* */
10/* Echangeur de particules. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/mesh/NonBlockingParticleExchanger.h"
15
16#include "arcane/utils/List.h"
17#include "arcane/utils/FatalErrorException.h"
18#include "arcane/utils/IFunctor.h"
19#include "arcane/utils/PlatformUtils.h"
20
21#include "arcane/ItemGroup.h"
22#include "arcane/ItemVector.h"
23#include "arcane/IItemFamily.h"
24#include "arcane/IParticleFamily.h"
25#include "arcane/IParallelMng.h"
26#include "arcane/IVariableMng.h"
27#include "arcane/IVariable.h"
28#include "arcane/IMesh.h"
29#include "arcane/Item.h"
30#include "arcane/Timer.h"
31#include "arcane/SerializeMessage.h"
32#include "arcane/ISerializeMessageList.h"
33#include "arcane/CommonVariables.h"
34#include "arcane/Timer.h"
35#include "arcane/FactoryService.h"
36
37//#define ARCANE_DEBUG_EXCHANGE_ITEMS
38
39/*---------------------------------------------------------------------------*/
40/*---------------------------------------------------------------------------*/
41
42namespace Arcane::mesh
43{
44
45/*---------------------------------------------------------------------------*/
46/*---------------------------------------------------------------------------*/
47
48NonBlockingParticleExchanger::
49NonBlockingParticleExchanger(const ServiceBuildInfo& sbi)
50: BasicService(sbi)
51, m_item_family(nullptr)
52, m_parallel_mng(sbi.mesh()->parallelMng())
53, m_rank(m_parallel_mng->commRank())
54, m_timer(new Timer(m_parallel_mng->timerMng(),"NonBlockingParticleExchanger",Timer::TimerReal))
55, m_total_time_functor(0.)
56, m_total_time_waiting(0.)
57, m_nb_total_particle_finish_exchange(0)
58, m_nb_total_particle(0)
59, m_nb_original_blocking_size(0)
60, m_nb_blocking_size(m_nb_original_blocking_size)
61, m_exchange_finished(true)
62, m_master_proc(0)
63, m_need_general_receive(false)
64, m_end_message_sended(false)
65, m_can_process_messages(true)
66, m_can_process_non_blocking(false)
67, m_want_process_non_blocking(false)
68, m_want_fast_send_particles(true)
69, m_nb_receive_message(0)
70, m_nb_particle_finished_exchange(0)
71, m_verbose_level(1)
72, m_is_debug(false)
73{
74 // m_want_fast_send_particles permet d'envoyer en même temps que
75 // les particules le nombre qui ont finie la poursuite. Cela permet
76 // d'éviter des messages supplémentaires.
77#if ARCANE_DEBUG_EXCHANGE_ITEMS
78 m_is_debug = true;
79#endif
80}
81
82/*---------------------------------------------------------------------------*/
83/*---------------------------------------------------------------------------*/
84
85NonBlockingParticleExchanger::
86~NonBlockingParticleExchanger()
87{
88 // On ne peut pas faire de throw dans le destructeur donc on affiche
89 // des avertissements de compilation
90 if (!m_pending_messages.empty() || !m_waiting_messages.empty()){
91 String s = String::format("pending or waiting messages: nb_pending={0} nb_waiting=",
92 m_pending_messages.size(),m_waiting_messages.size());
93 warning() << s;
94 }
95
96 if (!m_waiting_local_ids.empty()){
97 warning() << String::format("pending particles: nb_pending=",m_waiting_local_ids.size());
98 }
99
100 delete m_timer;
101}
102
103/*---------------------------------------------------------------------------*/
104/*---------------------------------------------------------------------------*/
105
106void NonBlockingParticleExchanger::
107initialize(IItemFamily* item_family)
108{
109 m_item_family = item_family;
110}
111
112/*---------------------------------------------------------------------------*/
113/*---------------------------------------------------------------------------*/
114
115void NonBlockingParticleExchanger::
116_clearMessages()
117{
118 for( Integer i=0, is=m_accumulate_infos.size(); i<is; ++i ){
119 delete m_accumulate_infos[i];
120 m_accumulate_infos[i] = 0;
121 }
122 m_accumulate_infos.clear();
123}
124
125/*---------------------------------------------------------------------------*/
126/*---------------------------------------------------------------------------*/
127
128void NonBlockingParticleExchanger::
129beginNewExchange(Integer i_nb_particle)
130{
131 _checkInitialized();
132 IParallelMng* pm = m_parallel_mng;
133
134 m_end_message_sended = false;
135 m_exchange_finished = false;
136 m_nb_blocking_size = m_nb_original_blocking_size;
137 m_nb_receive_message = 0;
138 m_nb_particle_finished_exchange = 0;
139
140 //TODO: utiliser un tag spécifique pour cet échange.
142 m_nb_total_particle = pm->reduce(Parallel::ReduceSum,nb_particle);
143 info() << "BEGIN TRACKING TOTAL FLYING = " << m_nb_total_particle
144 << " (local=" << nb_particle << ") "
145 << " (Date=" << platform::getCurrentDateTime() << ")";
146
147 m_nb_total_particle_finish_exchange = 0;
148
149 m_need_general_receive = true;
150
151 // Récupère la liste des variables à transferer.
152 // Il s'agit des variables qui ont la même famille que celle passée
153 // en paramètre.
154 // IMPORTANT: tous les sous-domaines doivent avoir ces mêmes variables
155 m_variables_to_exchange.clear();
156 m_item_family->usedVariables(m_variables_to_exchange);
157 m_variables_to_exchange.sortByName(true);
158}
159
160/*---------------------------------------------------------------------------*/
161/*---------------------------------------------------------------------------*/
162
163bool NonBlockingParticleExchanger::
164exchangeItems(Integer nb_particle_finish_exchange,
167 IFunctor* functor)
168{
169 //TODO a supprimer si passe par sendItems()
170 m_nb_particle_finished_exchange += nb_particle_finish_exchange;
171 return _exchangeItems(local_ids,sub_domains_to_send,item_group,0,functor);
172}
173
174/*---------------------------------------------------------------------------*/
175/*---------------------------------------------------------------------------*/
176
177bool NonBlockingParticleExchanger::
178exchangeItems(Integer nb_particle_finish_exchange,
182 IFunctor* functor)
183{
184 //TODO a supprimer si passe par sendItems()
185 m_nb_particle_finished_exchange += nb_particle_finish_exchange;
186 return _exchangeItems(local_ids,sub_domains_to_send,ItemGroup(),new_particle_local_ids,functor);
187}
188
189/*---------------------------------------------------------------------------*/
190/*---------------------------------------------------------------------------*/
191
192void NonBlockingParticleExchanger::
193sendItems(Integer nb_particle_finish_exchange,
196{
197 m_nb_particle_finished_exchange += nb_particle_finish_exchange;
198 _checkSendItems(local_ids,sub_domains_to_send);
199 _sendPendingMessages();
200}
201
202/*---------------------------------------------------------------------------*/
203/*---------------------------------------------------------------------------*/
204
205bool NonBlockingParticleExchanger::
207{
208 return _waitMessages(nb_pending_particle,ItemGroup(),new_particle_local_ids,functor);
209}
210
211/*---------------------------------------------------------------------------*/
212/*---------------------------------------------------------------------------*/
213
214void NonBlockingParticleExchanger::
215_checkSendItems(Int32ConstArrayView local_ids,
217{
218 Integer nb_particle = local_ids.size();
219 Integer nb_waiting_local_ids = m_waiting_local_ids.size();
220 Integer nb_waiting_sub_domains_to_send = m_waiting_sub_domains_to_send.size();
221 if ((nb_particle+nb_waiting_local_ids)>=m_nb_blocking_size){
222 _generateSendItemsMessages(local_ids,sub_domains_to_send);
223 }
224 else{
225 // Met les particules dans un tampon avant de les envoyer
226 m_waiting_local_ids.resize(nb_waiting_local_ids+nb_particle);
227 m_waiting_sub_domains_to_send.resize(nb_waiting_sub_domains_to_send+nb_particle);
228 for( Integer i=0; i<nb_particle; ++i ){
229 m_waiting_local_ids[nb_waiting_local_ids+i] = local_ids[i];
230 m_waiting_sub_domains_to_send[nb_waiting_sub_domains_to_send+i] = sub_domains_to_send[i];
231 }
232 }
233}
234
235/*---------------------------------------------------------------------------*/
236/*---------------------------------------------------------------------------*/
237
238bool NonBlockingParticleExchanger::
239_exchangeItems(Int32ConstArrayView local_ids,
240 Int32ConstArrayView sub_domains_to_send,ItemGroup item_group,
241 Int32Array* new_particle_local_ids,
242 IFunctor* functor)
243{
244 if (m_want_fast_send_particles){
245 if (local_ids.empty())
246 _processFinishTrackingMessage();
247 }
248 else
249 if (!m_want_process_non_blocking)
250 _processFinishTrackingMessage();
251 if (m_exchange_finished){
252 _sendFinishExchangeParticle();
253 m_need_general_receive = false;
254 }
255 _checkNeedReceiveMessage();
256
257 _checkSendItems(local_ids,sub_domains_to_send);
258
259 return _waitMessages(0,item_group,new_particle_local_ids,functor);
260}
261
262/*---------------------------------------------------------------------------*/
263/*---------------------------------------------------------------------------*/
264
265bool NonBlockingParticleExchanger::
266_waitMessages(Integer nb_pending_particle,ItemGroup item_group,
267 Int32Array* new_particle_local_ids,IFunctor* functor)
268{
269 ARCANE_UNUSED(nb_pending_particle);
270
271 if (!item_group.null())
272 item_group.clear();
273
274 m_can_process_messages = true;
275 m_can_process_non_blocking = m_want_process_non_blocking;
276 while (m_can_process_messages && !m_exchange_finished){
277 m_can_process_messages = false;
278 _processMessages(item_group,new_particle_local_ids,false,functor);
279 }
280
281 if (m_exchange_finished){
282 info(5) << " ** EXCHANGE finished: ";
283 // Cela est nécessaire pour être certain que les messages
284 // de fin sont bien tous réceptionnés
285 _processMessages(item_group,new_particle_local_ids,true,0);
286 info(5) << " ** EXCHANGE finished END: ";
287 }
288
289 info(5) << " ** RETURN EXCHANGE m_exchange_finished: " << m_exchange_finished;
290 return m_exchange_finished;
291}
292
293/*---------------------------------------------------------------------------*/
294/*---------------------------------------------------------------------------*/
295
296void NonBlockingParticleExchanger::
297_checkNeedReceiveMessage()
298{
299 if (m_need_general_receive){
300 auto sm = new SerializeMessage(m_rank,A_NULL_RANK,ISerializeMessage::MT_Recv);
301 m_pending_messages.add(sm);
302 m_need_general_receive = false;
303 }
304}
305
306/*---------------------------------------------------------------------------*/
307/*---------------------------------------------------------------------------*/
308
309void NonBlockingParticleExchanger::
310_generateSendItemsMessages(Int32ConstArrayView local_ids,
311 Int32ConstArrayView sub_domains_to_send)
312{
313 Timer::Phase tphase(m_parallel_mng->timeStats(),TP_Communication);
314
315 IMesh* mesh = m_item_family->mesh();
316
317 Int32UniqueArray communicating_sub_domains;
318 mesh->cellFamily()->getCommunicatingSubDomains(communicating_sub_domains);
319
320 Integer nb_connected_sub_domain = communicating_sub_domains.size();
321 //Integer max_sub_domain_id = 0;
322 UniqueArray< SharedArray<Int32> > ids_to_send(nb_connected_sub_domain);
323 // Infos pour chaque sous-domaine connecté
324 //_clearMessages();
325 m_accumulate_infos.clear();
326 m_accumulate_infos.resize(nb_connected_sub_domain);
327 for( Integer i=0; i<nb_connected_sub_domain; ++i )
328 m_accumulate_infos[i] = new SerializeMessage(m_rank,communicating_sub_domains[i],
329 ISerializeMessage::MT_Send);
330
331 _addItemsToSend(local_ids,sub_domains_to_send,communicating_sub_domains,ids_to_send);
332 _addItemsToSend(m_waiting_local_ids,m_waiting_sub_domains_to_send,
333 communicating_sub_domains,ids_to_send);
334
335
336 if (m_is_debug){
337 info() << "-- Subdomain " << m_rank << ". NB to send: " << local_ids.size()
338 << " NB connected subdomains: " << nb_connected_sub_domain;
339
340 info() << "NB connected subdomain for " << m_rank << " : " << m_accumulate_infos.size();
341 for( Integer i=0, n=m_accumulate_infos.size(); i<n; ++i ){
342 info() << "------------- Send: rank=" << m_accumulate_infos[i]->destRank()
343 << " n=" << ids_to_send[i].size();
344 }
345 }
346
347 Int64UniqueArray items_to_send_uid;
348 Int64UniqueArray items_to_send_cells_uid; // Uniquement pour les particules;
349
350 for( Integer j=0; j<nb_connected_sub_domain; ++j ){
351 ISerializeMessage* sm = m_accumulate_infos[j];
352 // En mode bloquant, envoie toujours le message car le destinataire a posté
353 // un message de réception. Sinon, le message n'a besoin d'être envoyé que
354 // s'il contient des particules.
355 if (!ids_to_send[j].empty())
356 _serializeMessage(sm,ids_to_send[j],items_to_send_uid,
357 items_to_send_cells_uid);
358 else
359 // Le message n'est pas utile car vide.
360 delete sm;
361 }
362
363 m_accumulate_infos.clear();
364
365 // Détruit les entités qui viennent d'être envoyées
366 info(5)<<"NonBlockingParticleExchanger:: sendItems " << "local_ids "<<local_ids.size();
367 info(5)<<"NonBlockingParticleExchanger:: sendItems " << "m_waiting_local_ids "<<m_waiting_local_ids.size();
368
369 m_item_family->toParticleFamily()->removeParticles(local_ids);
370 m_item_family->toParticleFamily()->removeParticles(m_waiting_local_ids);
371 m_item_family->endUpdate();
372 m_waiting_local_ids.clear();
373 m_waiting_sub_domains_to_send.clear();
374}
375
376/*---------------------------------------------------------------------------*/
377/*---------------------------------------------------------------------------*/
378
379void NonBlockingParticleExchanger::
380_addItemsToSend(Int32ConstArrayView local_ids,
381 Int32ConstArrayView sub_domains_to_send,
382 Int32ConstArrayView communicating_sub_domains,
383 UniqueArray<SharedArray<Int32> >& ids_to_send)
384{
385 String func_name("NonBlockingParticleExchanger::_addItemsToSend()");
386 Integer nb_connected_sub_domain = ids_to_send.size();
387 // Cherche pour chaque élément à quel sous-domaine il doit être transféré.
388 // Cette recherche se fait en se basant sur les \a local_ids
389 Integer id_size = local_ids.size();
390 for( Integer i=0; i<id_size; ++i ){
391 Int32 item_local_id = local_ids[i];
392 Integer sd_to_send = sub_domains_to_send[i];
393#ifdef ARCANE_CHECK
394 if (sd_to_send==m_rank)
395 // Il s'agit d'une entité propre à ce sous-domaine
396 fatal() << func_name << "The entity with local id " << item_local_id
397 << " should not be sent to its own subdomain";
398#endif
399 // Recherche l'index du sous-domaine auquel l'entité appartient
400 // dans la liste \a sync_list
401 // TODO: utiliser une table indirect (tableau alloué au nombre de sous-domaines)
402 Integer sd_index = nb_connected_sub_domain;
403 for( Integer i_sd=0; i_sd<nb_connected_sub_domain; ++i_sd )
404 if (sd_to_send==communicating_sub_domains[i_sd]){
405 sd_index = i_sd;
406 break;
407 }
408#ifdef ARCANE_CHECK
409 if (sd_index==nb_connected_sub_domain)
410 fatal() << func_name << "Internal: bad subdomain index";
411#endif
412 ids_to_send[sd_index].add(item_local_id);
413 }
414}
415
416/*---------------------------------------------------------------------------*/
417/*---------------------------------------------------------------------------*/
418
419void NonBlockingParticleExchanger::
420_processMessages(ItemGroup item_group,Int32Array* new_particle_local_ids,
421 bool wait_all,IFunctor* functor)
422{
423 _sendPendingMessages();
424
425 if (functor){
426 {
427 Timer::Sentry ts(m_timer);
428 functor->executeFunctor();
429 }
430 m_total_time_functor += m_timer->lastActivationTime();
431 info(5) << "TimeFunctor: current=" << m_timer->lastActivationTime()
432 << " total=" << m_total_time_functor;
433 }
434
435 Integer nb_message_finished = 0;
436 {
437 Timer::Sentry ts(m_timer);
438 if (wait_all)
439 nb_message_finished = m_message_list->waitMessages(Parallel::WaitAll);
440 else{
441 if (m_can_process_non_blocking)
442 nb_message_finished = m_message_list->waitMessages(Parallel::WaitSomeNonBlocking);
443 else
444 nb_message_finished = m_message_list->waitMessages(Parallel::WaitSome);
445 //info() << "Nb finished=" << nb_message_finished << " is_block=" << m_can_process_non_blocking;
446 if (nb_message_finished==0){
447 m_can_process_non_blocking = false;
448 m_can_process_messages = true;
449 _processFinishTrackingMessage();
450 return;
451 }
452 }
453 }
454 m_total_time_waiting += m_timer->lastActivationTime();
455 info(5) << "TimeWaiting: current=" << m_timer->lastActivationTime()
456 << " total=" << m_total_time_waiting;
457
458 // Sauve les communications actuellement traitées car le traitement
459 // peut en ajouter de nouvelles
460 UniqueArray<ISerializeMessage*> current_messages(m_waiting_messages);
461 m_waiting_messages.clear();
462
463 Int64UniqueArray items_to_create_id;
464 Int64UniqueArray items_to_create_cells_id;
465 for( Integer i=0, is=current_messages.size(); i<is; ++i ){
466 ISerializeMessage* sm = current_messages[i];
467 if (sm->finished()){
468 if (!sm->isSend()){
469 _deserializeMessage(sm,items_to_create_id,items_to_create_cells_id,item_group,new_particle_local_ids);
470 ++m_nb_receive_message;
471 }
472 delete sm;
473 }
474 else
475 m_waiting_messages.add(sm);
476 }
477}
478
479/*---------------------------------------------------------------------------*/
480/*---------------------------------------------------------------------------*/
481
482void NonBlockingParticleExchanger::
483_sendPendingMessages()
484{
485 IParallelMng* pm = m_parallel_mng;
486
487 _checkNeedReceiveMessage();
488
489 if (!m_message_list.get())
490 m_message_list = pm->createSerializeMessageListRef();
491
492 {
493 Timer::Sentry ts(m_timer);
494 // Ajoute les messages en attente de traitement
495 Integer nb_message = m_pending_messages.size();
496 for( Integer i=0; i<nb_message; ++i ){
497 m_message_list->addMessage(m_pending_messages[i]);
498 m_waiting_messages.add(m_pending_messages[i]);
499 }
500 m_message_list->processPendingMessages();
501 m_pending_messages.clear();
502 }
503 info(5) << "TimeSendMessages=" << m_timer->lastActivationTime()
504 << " buffersize=" << m_waiting_local_ids.size();
505}
506
507/*---------------------------------------------------------------------------*/
508/*---------------------------------------------------------------------------*/
509
510void NonBlockingParticleExchanger::
511_serializeMessage(ISerializeMessage* sm,
512 Int32ConstArrayView acc_ids,
513 Int64Array& items_to_send_uid,
514 Int64Array& items_to_send_cells_uid)
515{
516 ParticleInfoListView internal_items(m_item_family);
517
518 ISerializer* sbuf = sm->serializer();
519 sbuf->setMode(ISerializer::ModeReserve);
520
521 //for( Integer j=0; j<nb_connected_sub_domain; ++j ){
522 //ConstArrayView<Integer> acc_ids = m_ids_to_send[j];
523 Integer nb_item = acc_ids.size();
524 // Réserve pour le type de message
525 sbuf->reserveInteger(1);
526 if (m_want_fast_send_particles){
527 // Réserve pour le nombre de particules traitées
528 sbuf->reserve(DT_Int64,1);
529 }
530 // Réserve pour le rang de l'expéditeur
531 sbuf->reserve(DT_Int32,1);
532 // Réserve pour le nombre de uniqueId()
533 sbuf->reserve(DT_Int64,1);
534 // Réserve pour les uniqueId() des particules
535 sbuf->reserveSpan(DT_Int64,nb_item);
536 // Réserve pour les uniqueId() des mailles dans lesquelles se trouvent les particules
537 sbuf->reserveSpan(DT_Int64,nb_item);
538
539 for( VariableList::Enumerator i_var(m_variables_to_exchange); ++i_var; ){
540 IVariable* var = *i_var;
541 var->serialize(sbuf,acc_ids);
542 }
543
544 // Sérialise les données en écriture
545 sbuf->allocateBuffer();
546 sbuf->setMode(ISerializer::ModePut);
547
548 sbuf->putInteger(MESSAGE_EXCHANGE);
549 if (m_want_fast_send_particles){
550 sbuf->putInt64(m_nb_particle_finished_exchange);
551 m_nb_particle_finished_exchange = 0;
552 }
553 sbuf->putInt32(m_rank);
554 sbuf->putInt64(nb_item);
555 items_to_send_uid.resize(nb_item);
556 items_to_send_cells_uid.resize(nb_item);
557
558 for( Integer z=0; z<nb_item; ++z ){
559 Particle item = internal_items[acc_ids[z]];
560 items_to_send_uid[z] = item.uniqueId();
561 bool has_cell = item.hasCell();
562 items_to_send_cells_uid[z] = (has_cell) ? item.cell().uniqueId() : NULL_ITEM_UNIQUE_ID;
563#ifdef ARCANE_DEBUG_EXCHANGE_ITEMS
564#if 0
565 info() << "Particle BufID=" << acc_ids[z]
566 << " LID=" << item.localId()
567 << " UID=" << items_to_send_uid[z]
568 << " CellIUID=" << items_to_send_cells_uid[z]
569 << " (owner=" << item.cell().owner() << ")";
570#endif
571#endif
572 }
573 sbuf->putSpan(items_to_send_uid);
574 sbuf->putSpan(items_to_send_cells_uid);
575
576 for( VariableList::Enumerator i_var(m_variables_to_exchange); ++i_var; ){
577 IVariable* var = *i_var;
578 var->serialize(sbuf,acc_ids);
579 }
580
581 m_pending_messages.add(sm);
582}
583
584/*---------------------------------------------------------------------------*/
585/*---------------------------------------------------------------------------*/
586
587void NonBlockingParticleExchanger::
588_deserializeMessage(ISerializeMessage* message,
589 Int64Array& items_to_create_unique_id,
590 Int64Array& items_to_create_cells_id,
591 ItemGroup item_group,
592 Int32Array* new_particle_local_ids)
593{
594
595 IMesh* mesh = m_item_family->mesh();
596 ISerializer* sbuf = message->serializer();
597
598 // Indique qu'on souhaite sérialiser les données en lecture
599 sbuf->setMode(ISerializer::ModeGet);
600 sbuf->setReadMode(ISerializer::ReadReplace);
601 Int32UniqueArray items_to_create_local_id;
602 Int32UniqueArray cells_lid;
603
604 Integer message_type = sbuf->getInteger();
605 info(4) << "Deserialise message_type=" << (int)message_type;
606 switch(message_type){
607 case MESSAGE_EXCHANGE:
608 {
609 m_need_general_receive = true;
610 if (m_want_fast_send_particles){
611 Int64 nb_finished = sbuf->getInt64();
612 m_nb_particle_finished_exchange += nb_finished;
613 }
614 Int32 orig_rank = sbuf->getInt32();
615 Int64 nb_item = sbuf->getInt64();
616 if (m_is_debug)
617 info() << "------------- Receive: rank=" << orig_rank << " particle nb=" << nb_item
618 << " (orig_rank=" << message->destination() << ")";
619
620 //if (nb_item!=0)
621 //info() << "Receiving particules n=" << nb_item;
622 items_to_create_local_id.resize(nb_item);
623 items_to_create_unique_id.resize(nb_item);
624 items_to_create_cells_id.resize(nb_item);
625 sbuf->getSpan(items_to_create_unique_id);
626 sbuf->getSpan(items_to_create_cells_id);
627#ifdef ARCANE_DEBUG_EXCHANGE_ITEMS
628 //info() << "Recv from SID " << sync_infos[i].subDomain() << " N=" << nb_item;
629#if 0
630 for( Integer z=0; z<nb_item; ++z ){
631 info() << "Particle UID=" << items_to_create_unique_id[z]
632 << " CellIUID=" << items_to_create_cells_id[z];
633 }
634#endif
635#endif
636 cells_lid.resize(nb_item);
637 mesh->cellFamily()->itemsUniqueIdToLocalId(cells_lid,items_to_create_cells_id);
638
639 items_to_create_local_id.resize(nb_item);
640 ParticleVectorView particles_view = m_item_family->toParticleFamily()->addParticles(items_to_create_unique_id,
641 cells_lid,
642 items_to_create_local_id);
643 info(5) << "Nb create=" << particles_view.size();
644
645 // Notifie la famille qu'on a fini nos modifs.
646 // Après appel à cette méthode, les variables sont à nouveau utilisables
647 m_item_family->endUpdate();
648
649 // Converti les uniqueId() récupérée en localId() et pour les particules
650 // renseigne la maille correspondante
651 ParticleInfoListView internal_items(m_item_family);
652 //ItemInternalList internal_cells(mesh->itemsInternal(IK_Cell));
653 //m_item_family->itemsUniqueIdToLocalId(items_to_create_local_id,items_to_create_unique_id);
654
655 for( Integer z=0; z<nb_item; ++z ){
656 Particle item = internal_items[items_to_create_local_id[z]];
657 //item.setCell( internal_cells[cells_lid[z]] );
658 // Je suis le nouveau propriétaire (TODO: ne pas faire ici)
659 item.mutableItemBase().setOwner(m_rank,m_rank);
660 }
661 if (!item_group.null())
662 item_group.addItems(items_to_create_local_id,false);
663 if (new_particle_local_ids)
664 new_particle_local_ids->addRange(items_to_create_local_id);
665
666 for( VariableCollection::Enumerator i_var(m_variables_to_exchange); ++i_var; ){
667 IVariable* var = *i_var;
668 var->serialize(sbuf,items_to_create_local_id);
669 }
670 }
671 break;
672 case MESSAGE_NB_FINISH_EXCHANGE:
673 {
674 m_need_general_receive = true;
675 // Indique qu'on peut continuer à recevoir des messages, celui-ci n'étant pas
676 // significatif
677 m_can_process_messages = true;
678 Int64 nb_particle = sbuf->getInt64();
679 Int32 orig_rank = sbuf->getInt32();
680 if (m_is_debug)
681 info() << "MESSAGE_NB_FINISH_EXCHANGE nb=" << nb_particle << " (from rank=" << orig_rank << ")";
682 _addFinishExchangeParticle(nb_particle);
683 }
684 break;
685 case MESSAGE_FINISH_EXCHANGE_STATUS:
686 {
687 m_nb_total_particle_finish_exchange = sbuf->getInt64();
688 m_exchange_finished = (m_nb_total_particle_finish_exchange==m_nb_total_particle);
689 //#ifdef ARCANE_DEBUG_EXCHANGE_ITEMS
690 info() << "** RECEIVING FINISH EXCHANGE " << m_exchange_finished
691 << " finish=" << m_nb_total_particle_finish_exchange
692 << " total=" << m_nb_total_particle;
693 //#endif
694 //if (m_exchange_finished)
695 //warning() << "Exchange finished ! " << m_current_iteration;
696 }
697 break;
698 case MESSAGE_CHANGE_BLOCKING:
699 {
700 m_need_general_receive = true;
701
702 Integer nb_blocking_size = sbuf->getInteger();
703 // Il faut être certain que le nouveau \a blocking_size
704 // est inférieur au courant, ce qui peut arriver lorsqu'on
705 // recoit plusieurs messages de ce type en même temps.
706 if (nb_blocking_size<m_nb_blocking_size)
707 m_nb_blocking_size = nb_blocking_size;
708 info(4) << "** RECEIVING CHANGE BLOCKING"
709 << " new_blocking_size=" << m_nb_blocking_size;
710 // Comme il peut y avoir des particules en attente, il faut
711 // maintenant les envoyer
712 if (m_waiting_local_ids.size()>0)
713 _generateSendItemsMessages(Int32ConstArrayView(),Int32ConstArrayView());
714 }
715 break;
716 }
717}
718
719/*---------------------------------------------------------------------------*/
720/*---------------------------------------------------------------------------*/
721
722void NonBlockingParticleExchanger::
723reset()
724{
725 if (!m_waiting_messages.empty())
726 ARCANE_FATAL("reset() waiting parallel requests");
727 _clearMessages();
728}
729
730/*---------------------------------------------------------------------------*/
731/*---------------------------------------------------------------------------*/
732
733void NonBlockingParticleExchanger::
734_processFinishTrackingMessage()
735{
736 // Si processeur == m_master_proc,
737 // - réceptionne des autres leur nombre fini.
738 // - fait le total
739 // - le renvoi à tout le monde
740 // Si processeur != m_master_proc
741 // - envoie à m_master_proc la valeur \a nb_finish_tracking_particle
742 // - réceptionne de m_master_proc le nombre total ayant terminé
743 if (m_rank==m_master_proc){
744 _addFinishExchangeParticle(m_nb_particle_finished_exchange);
745 }
746 else{
747 // Envoie le nombre de particules au processeur \a master_proc
748 if (m_nb_particle_finished_exchange!=0) {
749 info(4) << "Send to master proc (" << m_master_proc << ") nb_finish=" << m_nb_particle_finished_exchange;
750 SerializeMessage* sm = new SerializeMessage(m_rank,m_master_proc,ISerializeMessage::MT_Send);
751 ISerializer* sbuf = sm->serializer();
752 sbuf->setMode(ISerializer::ModeReserve);
753 sbuf->reserveInteger(1);
754 sbuf->reserve(DT_Int64,1);
755 sbuf->reserve(DT_Int32,1);
756 sbuf->allocateBuffer();
757 sbuf->setMode(ISerializer::ModePut);
758 sbuf->putInteger(MESSAGE_NB_FINISH_EXCHANGE);
759 sbuf->putInt64(m_nb_particle_finished_exchange);
760 sbuf->putInt32(m_rank);
761 m_pending_messages.add(sm);
762 }
763 }
764 m_nb_particle_finished_exchange = 0;
765}
766
767/*---------------------------------------------------------------------------*/
768/*---------------------------------------------------------------------------*/
769
770void NonBlockingParticleExchanger::
771_sendFinishExchangeParticle()
772{
773 Int32 nb_rank = m_parallel_mng->commSize();
774 if (m_rank!=m_master_proc || m_end_message_sended)
775 return;
776 m_end_message_sended = true;
777 info(4) << " ** ** SEND FINISH EXCHANGE PARTICLE2";
778 for( Integer i=0; i<nb_rank; ++i ){
779 if (i==m_master_proc)
780 continue;
781 SerializeMessage* sm = new SerializeMessage(m_rank,i,ISerializeMessage::MT_Send);
782 ISerializer* sbuf = sm->serializer();
783 sbuf->setMode(ISerializer::ModeReserve);
784 sbuf->reserveInteger(1);
785 sbuf->reserve(DT_Int64,1);
786 sbuf->allocateBuffer();
787 sbuf->setMode(ISerializer::ModePut);
788 sbuf->putInteger(MESSAGE_FINISH_EXCHANGE_STATUS);
789 sbuf->putInt64(m_nb_total_particle_finish_exchange);
790 m_pending_messages.add(sm);
791 }
792}
793
794/*---------------------------------------------------------------------------*/
795/*---------------------------------------------------------------------------*/
796
797void NonBlockingParticleExchanger::
798_addFinishExchangeParticle(Int64 nb_particle_finish_exchange)
799{
800 m_nb_total_particle_finish_exchange += nb_particle_finish_exchange;
801 Int32 nb_rank = m_parallel_mng->commSize();
802 Int64 nb_rank_as_int64 = nb_rank;
803 //#ifdef ARCANE_DEBUG_EXCHANGE_ITEMS
804 info(4) << "** RECEIVING FINISH EXCHANGE n=" << nb_particle_finish_exchange
805 << " totalfinish=" << m_nb_total_particle_finish_exchange
806 << " total=" << m_nb_total_particle;
807 //#endif
808 Int64 remaining_particle = m_nb_total_particle - m_nb_total_particle_finish_exchange;
809 if (remaining_particle==0){
810 m_exchange_finished = true;
811 m_need_general_receive = false;
812 info() << "** ** FINISH TRACKING NB_RECV=" << m_nb_receive_message
813 << " (Date=" << platform::getCurrentDateTime() << ")";
814 _sendFinishExchangeParticle();
815 }
816 else if (remaining_particle<(m_nb_blocking_size*nb_rank_as_int64)){
817 //Integer nb_rank = subDomain()->nbSubDomain();
818 //m_nb_blocking_size /= 100;
819 m_nb_blocking_size = 0;
820 warning() << "** ** CHANGE BLOCKING NEW_SIZE " << m_nb_blocking_size
821 << " REMAING_PARTICLE " << remaining_particle
822 << " (Date=" << platform::getCurrentDateTime() << ")";
823
824 // Comme il peut y avoir des particules en attente, il faut
825 // maintenant les envoyer
826 if (m_waiting_local_ids.size()>0)
827 _generateSendItemsMessages(Int32ConstArrayView(),Int32ConstArrayView());
828 for( Int32 i=0; i<nb_rank; ++i ){
829 if (i==m_master_proc)
830 continue;
831 SerializeMessage* sm = new SerializeMessage(m_rank,i,ISerializeMessage::MT_Send);
832 ISerializer* sbuf = sm->serializer();
833 sbuf->setMode(ISerializer::ModeReserve);
834 sbuf->reserveInteger(1);
835 sbuf->reserveInteger(1);
836 sbuf->allocateBuffer();
837 sbuf->setMode(ISerializer::ModePut);
838 sbuf->putInteger(MESSAGE_CHANGE_BLOCKING);
839 sbuf->putInteger(m_nb_blocking_size);
840 m_pending_messages.add(sm);
841 }
842 }
843}
844
845
846/*---------------------------------------------------------------------------*/
847/*---------------------------------------------------------------------------*/
848
849void NonBlockingParticleExchanger::
850_checkInitialized()
851{
852 if (!m_item_family)
853 ARCANE_FATAL("method initialized() not called");
854 _clearMessages();
855}
856
857/*---------------------------------------------------------------------------*/
858/*---------------------------------------------------------------------------*/
859
860ARCANE_REGISTER_CASE_OPTIONS_NOAXL_FACTORY(NonBlockingParticleExchanger,IParticleExchanger,
861 NonBlockingParticleExchanger);
862
863/*---------------------------------------------------------------------------*/
864/*---------------------------------------------------------------------------*/
865
866} // End namespace Arcane::mesh
867
868/*---------------------------------------------------------------------------*/
869/*---------------------------------------------------------------------------*/
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
#define ARCANE_REGISTER_CASE_OPTIONS_NOAXL_FACTORY(aclass, ainterface, aname)
Enregistre un service de fabrique pour la classe aclass.
Tableau d'items de types quelconques.
Interface d'une famille d'entités.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual Ref< ISerializeMessageList > createSerializeMessageListRef()=0
Créé une liste pour gérer les 'ISerializeMessage'.
virtual char reduce(eReduceType rt, char v)=0
Effectue la réduction de type rt sur le réel v et retourne la valeur.
Groupe d'entités de maillage.
Definition ItemGroup.h:49
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Definition Lima.cc:120
Vue constante d'un tableau de type T.
ItemVectorViewT< Particle > ParticleVectorView
Vue sur un vecteur de particules.
Definition ItemTypes.h:309
UniqueArray< Int64 > Int64UniqueArray
Tableau dynamique à une dimension d'entiers 64 bits.
Definition UtilsTypes.h:513
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:640
UniqueArray< Int32 > Int32UniqueArray
Tableau dynamique à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:515
Int32 Integer
Type représentant un entier.