Arcane  4.1.12.0
Developer documentation
Loading...
Searching...
No Matches
NonBlockingParticleExchanger.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2026 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* NonBlockingParticleExchanger.cc (C) 2000-2025 */
9/* */
10/* Particle Exchanger. */
11/*---------------------------------------------------------------------------*/
12
13#include "arcane/mesh/NonBlockingParticleExchanger.h"
14
15#include "arcane/utils/List.h"
16#include "arcane/utils/FatalErrorException.h"
17#include "arcane/utils/IFunctor.h"
18#include "arcane/utils/PlatformUtils.h"
19
20#include "arcane/core/ItemGroup.h"
21#include "arcane/core/ItemVector.h"
22#include "arcane/core/IItemFamily.h"
23#include "arcane/core/IParticleFamily.h"
24#include "arcane/core/IParallelMng.h"
25#include "arcane/core/IVariableMng.h"
26#include "arcane/core/IVariable.h"
27#include "arcane/core/IMesh.h"
28#include "arcane/core/Item.h"
29#include "arcane/core/Timer.h"
30#include "arcane/core/ISerializeMessageList.h"
31#include "arcane/core/CommonVariables.h"
32#include "arcane/core/FactoryService.h"
33#include "arcane/core/internal/SerializeMessage.h"
34
35//#define ARCANE_DEBUG_EXCHANGE_ITEMS
36
37/*---------------------------------------------------------------------------*/
38/*---------------------------------------------------------------------------*/
39
40namespace Arcane::mesh
41{
42
43/*---------------------------------------------------------------------------*/
44/*---------------------------------------------------------------------------*/
45
46NonBlockingParticleExchanger::
47NonBlockingParticleExchanger(const ServiceBuildInfo& sbi)
48: BasicService(sbi)
49, m_item_family(nullptr)
50, m_parallel_mng(sbi.mesh()->parallelMng())
51, m_rank(m_parallel_mng->commRank())
52, m_timer(new Timer(m_parallel_mng->timerMng(), "NonBlockingParticleExchanger", Timer::TimerReal))
53, m_total_time_functor(0.)
54, m_total_time_waiting(0.)
55, m_nb_total_particle_finish_exchange(0)
56, m_nb_total_particle(0)
57, m_nb_original_blocking_size(0)
58, m_nb_blocking_size(m_nb_original_blocking_size)
59, m_exchange_finished(true)
60, m_master_proc(0)
61, m_need_general_receive(false)
62, m_end_message_sended(false)
63, m_can_process_messages(true)
64, m_can_process_non_blocking(false)
65, m_want_process_non_blocking(false)
66, m_want_fast_send_particles(true)
67, m_nb_receive_message(0)
68, m_nb_particle_finished_exchange(0)
69, m_verbose_level(1)
70, m_is_debug(false)
71{
72// m_want_fast_send_particles allows sending the number of particles that have finished tracking
73// at the same time, avoiding additional messages.
74#if ARCANE_DEBUG_EXCHANGE_ITEMS
75 m_is_debug = true;
76#endif
77}
78
79/*---------------------------------------------------------------------------*/
80/*---------------------------------------------------------------------------*/
81
82NonBlockingParticleExchanger::
83~NonBlockingParticleExchanger()
84{
85 // No throws are allowed in the destructor, so compilation warnings are generated
86 if (!m_pending_messages.empty() || !m_waiting_messages.empty()) {
87 String s = String::format("pending or waiting messages: nb_pending={0} nb_waiting=",
88 m_pending_messages.size(), m_waiting_messages.size());
89 warning() << s;
90 }
91
92 if (!m_waiting_local_ids.empty()) {
93 warning() << String::format("pending particles: nb_pending=", m_waiting_local_ids.size());
94 }
95
96 delete m_timer;
97}
98
99/*---------------------------------------------------------------------------*/
100/*---------------------------------------------------------------------------*/
101
102void NonBlockingParticleExchanger::
103initialize(IItemFamily* item_family)
104{
105 m_item_family = item_family;
106}
107
108/*---------------------------------------------------------------------------*/
109/*---------------------------------------------------------------------------*/
110
111void NonBlockingParticleExchanger::
112_clearMessages()
113{
114 for (Integer i = 0, is = m_accumulate_infos.size(); i < is; ++i) {
115 delete m_accumulate_infos[i];
116 m_accumulate_infos[i] = 0;
117 }
118 m_accumulate_infos.clear();
119}
120
121/*---------------------------------------------------------------------------*/
122/*---------------------------------------------------------------------------*/
123
124void NonBlockingParticleExchanger::
125beginNewExchange(Integer i_nb_particle)
126{
127 _checkInitialized();
128 IParallelMng* pm = m_parallel_mng;
129
130 m_end_message_sended = false;
131 m_exchange_finished = false;
132 m_nb_blocking_size = m_nb_original_blocking_size;
133 m_nb_receive_message = 0;
134 m_nb_particle_finished_exchange = 0;
135
136 // TODO: Use a specific tag for this exchange.
137 Int64 nb_particle = i_nb_particle;
138 m_nb_total_particle = pm->reduce(Parallel::ReduceSum, nb_particle);
139 info() << "BEGIN TRACKING TOTAL FLYING = " << m_nb_total_particle
140 << " (local=" << nb_particle << ") "
141 << " (Date=" << platform::getCurrentDateTime() << ")";
142
143 m_nb_total_particle_finish_exchange = 0;
144
145 m_need_general_receive = true;
146
147 // Retrieve the list of variables to transfer.
148 // These are variables from the same family as the ones passed as parameters.
149 // IMPORTANT: All sub-domains must have these same variables.
151 m_item_family->usedVariables(m_variables_to_exchange);
152 m_variables_to_exchange.sortByName(true);
153}
154
155/*---------------------------------------------------------------------------*/
156/*---------------------------------------------------------------------------*/
157
158bool NonBlockingParticleExchanger::
159exchangeItems(Integer nb_particle_finish_exchange,
160 Int32ConstArrayView local_ids,
161 Int32ConstArrayView sub_domains_to_send, ItemGroup item_group,
162 IFunctor* functor)
163{
164 // TODO: Remove this if it's already handled by sendItems()
165 m_nb_particle_finished_exchange += nb_particle_finish_exchange;
166 return _exchangeItems(local_ids, sub_domains_to_send, item_group, 0, functor);
167}
168
169/*---------------------------------------------------------------------------*/
170/*---------------------------------------------------------------------------*/
171
172bool NonBlockingParticleExchanger::
173exchangeItems(Integer nb_particle_finish_exchange,
174 Int32ConstArrayView local_ids,
175 Int32ConstArrayView sub_domains_to_send,
176 Int32Array* new_particle_local_ids,
177 IFunctor* functor)
178{
179 // TODO: Remove this if it's already handled by sendItems()
180 m_nb_particle_finished_exchange += nb_particle_finish_exchange;
181 return _exchangeItems(local_ids, sub_domains_to_send, ItemGroup(), new_particle_local_ids, functor);
182}
183
184/*---------------------------------------------------------------------------*/
185/*---------------------------------------------------------------------------*/
186
187void NonBlockingParticleExchanger::
188sendItems(Integer nb_particle_finish_exchange,
189 Int32ConstArrayView local_ids,
190 Int32ConstArrayView sub_domains_to_send)
191{
192 m_nb_particle_finished_exchange += nb_particle_finish_exchange;
193 _checkSendItems(local_ids, sub_domains_to_send);
194 _sendPendingMessages();
195}
196
197/*---------------------------------------------------------------------------*/
198/*---------------------------------------------------------------------------*/
199
200bool NonBlockingParticleExchanger::
201waitMessages(Integer nb_pending_particle, Int32Array* new_particle_local_ids, IFunctor* functor)
202{
203 return _waitMessages(nb_pending_particle, ItemGroup(), new_particle_local_ids, functor);
204}
205
206/*---------------------------------------------------------------------------*/
207/*---------------------------------------------------------------------------*/
208
209void NonBlockingParticleExchanger::
210_checkSendItems(Int32ConstArrayView local_ids,
211 Int32ConstArrayView sub_domains_to_send)
212{
213 Integer nb_particle = local_ids.size();
214 Integer nb_waiting_local_ids = m_waiting_local_ids.size();
215 Integer nb_waiting_sub_domains_to_send = m_waiting_sub_domains_to_send.size();
216 if ((nb_particle + nb_waiting_local_ids) >= m_nb_blocking_size) {
217 _generateSendItemsMessages(local_ids, sub_domains_to_send);
218 }
219 else {
220 // Place the particles in a buffer before sending them
221 m_waiting_local_ids.resize(nb_waiting_local_ids + nb_particle);
222 m_waiting_sub_domains_to_send.resize(nb_waiting_sub_domains_to_send + nb_particle);
223 for (Integer i = 0; i < nb_particle; ++i) {
224 m_waiting_local_ids[nb_waiting_local_ids + i] = local_ids[i];
225 m_waiting_sub_domains_to_send[nb_waiting_sub_domains_to_send + i] = sub_domains_to_send[i];
226 }
227 }
228}
229
230/*---------------------------------------------------------------------------*/
231/*---------------------------------------------------------------------------*/
232
233bool NonBlockingParticleExchanger::
234_exchangeItems(Int32ConstArrayView local_ids,
235 Int32ConstArrayView sub_domains_to_send, ItemGroup item_group,
236 Int32Array* new_particle_local_ids,
237 IFunctor* functor)
238{
239 if (m_want_fast_send_particles) {
240 if (local_ids.empty())
241 _processFinishTrackingMessage();
242 }
243 else if (!m_want_process_non_blocking)
244 _processFinishTrackingMessage();
245 if (m_exchange_finished) {
246 _sendFinishExchangeParticle();
247 m_need_general_receive = false;
248 }
249 _checkNeedReceiveMessage();
250
251 _checkSendItems(local_ids, sub_domains_to_send);
252
253 return _waitMessages(0, item_group, new_particle_local_ids, functor);
254}
255
256/*---------------------------------------------------------------------------*/
257/*---------------------------------------------------------------------------*/
258
259bool NonBlockingParticleExchanger::
260_waitMessages(Integer nb_pending_particle, ItemGroup item_group,
261 Int32Array* new_particle_local_ids, IFunctor* functor)
262{
263 ARCANE_UNUSED(nb_pending_particle);
264
265 if (!item_group.null())
266 item_group.clear();
267
268 m_can_process_messages = true;
269 m_can_process_non_blocking = m_want_process_non_blocking;
270 while (m_can_process_messages && !m_exchange_finished) {
271 m_can_process_messages = false;
272 _processMessages(item_group, new_particle_local_ids, false, functor);
273 }
274
275 if (m_exchange_finished) {
276 info(5) << " ** EXCHANGE finished: ";
277 // This ensures that all completion messages are received
278 _processMessages(item_group, new_particle_local_ids, true, 0);
279 info(5) << " ** EXCHANGE finished END: ";
280 }
281
282 info(5) << " ** RETURN EXCHANGE m_exchange_finished: " << m_exchange_finished;
283 return m_exchange_finished;
284}
285
286/*---------------------------------------------------------------------------*/
287/*---------------------------------------------------------------------------*/
288
289void NonBlockingParticleExchanger::
290_checkNeedReceiveMessage()
291{
292 if (m_need_general_receive) {
293 auto sm = new SerializeMessage(m_rank, A_NULL_RANK, ISerializeMessage::MT_Recv);
294 m_pending_messages.add(sm);
295 m_need_general_receive = false;
296 }
297}
298
299/*---------------------------------------------------------------------------*/
300/*---------------------------------------------------------------------------*/
301
302void NonBlockingParticleExchanger::
303_generateSendItemsMessages(Int32ConstArrayView local_ids,
304 Int32ConstArrayView sub_domains_to_send)
305{
306 Timer::Phase tphase(m_parallel_mng->timeStats(), TP_Communication);
307
308 IMesh* mesh = m_item_family->mesh();
309
310 Int32UniqueArray communicating_sub_domains;
311 mesh->cellFamily()->getCommunicatingSubDomains(communicating_sub_domains);
312
313 Integer nb_connected_sub_domain = communicating_sub_domains.size();
314 //Integer max_sub_domain_id = 0;
315 UniqueArray<SharedArray<Int32>> ids_to_send(nb_connected_sub_domain);
316 // Information for each connected sub-domain
317 //_clearMessages();
318 m_accumulate_infos.clear();
319 m_accumulate_infos.resize(nb_connected_sub_domain);
320 for (Integer i = 0; i < nb_connected_sub_domain; ++i)
321 m_accumulate_infos[i] = new SerializeMessage(m_rank, communicating_sub_domains[i],
322 ISerializeMessage::MT_Send);
323
324 _addItemsToSend(local_ids, sub_domains_to_send, communicating_sub_domains, ids_to_send);
325 _addItemsToSend(m_waiting_local_ids, m_waiting_sub_domains_to_send,
326 communicating_sub_domains, ids_to_send);
327
328 if (m_is_debug) {
329 info() << "-- Subdomain " << m_rank << ". NB to send: " << local_ids.size()
330 << " NB connected subdomains: " << nb_connected_sub_domain;
331
332 info() << "NB connected subdomain for " << m_rank << " : " << m_accumulate_infos.size();
333 for (Integer i = 0, n = m_accumulate_infos.size(); i < n; ++i) {
334 info() << "------------- Send: rank=" << m_accumulate_infos[i]->destRank()
335 << " n=" << ids_to_send[i].size();
336 }
337 }
338
339 Int64UniqueArray items_to_send_uid;
340 Int64UniqueArray items_to_send_cells_uid; // Only for particles;
341
342 for (Integer j = 0; j < nb_connected_sub_domain; ++j) {
343 ISerializeMessage* sm = m_accumulate_infos[j];
344 // In blocking mode, always send the message because the recipient has already sent
345 // a reception message. Otherwise, send it only if it contains particles.
346 if (!ids_to_send[j].empty())
347 _serializeMessage(sm, ids_to_send[j], items_to_send_uid,
348 items_to_send_cells_uid);
349 else
350 // The message is useless since it's empty.
351 delete sm;
352 }
353
354 m_accumulate_infos.clear();
355
356 // Destroy the entities that have just been sent
357 info(5) << "NonBlockingParticleExchanger:: sendItems " << "local_ids " << local_ids.size();
358 info(5) << "NonBlockingParticleExchanger:: sendItems " << "m_waiting_local_ids " << m_waiting_local_ids.size();
359
360 m_item_family->toParticleFamily()->removeParticles(local_ids);
361 m_item_family->toParticleFamily()->removeParticles(m_waiting_local_ids);
362 m_item_family->endUpdate();
363 m_waiting_local_ids.clear();
364 m_waiting_sub_domains_to_send.clear();
365}
366
367/*---------------------------------------------------------------------------*/
368/*---------------------------------------------------------------------------*/
369
370void NonBlockingParticleExchanger::
371_addItemsToSend(Int32ConstArrayView local_ids,
372 Int32ConstArrayView sub_domains_to_send,
373 Int32ConstArrayView communicating_sub_domains,
374 UniqueArray<SharedArray<Int32>>& ids_to_send)
375{
376 String func_name("NonBlockingParticleExchanger::_addItemsToSend()");
377 Integer nb_connected_sub_domain = ids_to_send.size();
378 // Determine to which sub-domain each item should be sent.
379 // This is done based on the local_ids parameter.
380 Integer id_size = local_ids.size();
381 for (Integer i = 0; i < id_size; ++i) {
382 Int32 item_local_id = local_ids[i];
383 Integer sd_to_send = sub_domains_to_send[i];
384#ifdef ARCANE_CHECK
385 if (sd_to_send == m_rank)
386 // This item belongs to this sub-domain.
387 fatal() << func_name << "The entity with local id " << item_local_id
388 << " should not be sent to its own subdomain";
389#endif
390 // Find the index of the sub-domain to which the item belongs
391 // in the list of connected sub-domains.
392 // TODO: Use an indirect method (e.g., a table based on the number of sub-domains).
393 Integer sd_index = nb_connected_sub_domain;
394 for (Integer i_sd = 0; i_sd < nb_connected_sub_domain; ++i_sd)
395 if (sd_to_send == communicating_sub_domains[i_sd]) {
396 sd_index = i_sd;
397 break;
398 }
399#ifdef ARCANE_CHECK
400 if (sd_index == nb_connected_sub_domain)
401 fatal() << func_name << "Internal: bad subdomain index";
402#endif
403 ids_to_send[sd_index].add(item_local_id);
404 }
405}
406
407/*---------------------------------------------------------------------------*/
408/*---------------------------------------------------------------------------*/
409
410void NonBlockingParticleExchanger::
411_processMessages(ItemGroup item_group, Int32Array* new_particle_local_ids,
412 bool wait_all, IFunctor* functor)
413{
414 _sendPendingMessages();
415
416 if (functor) {
417 {
418 Timer::Sentry ts(m_timer);
419 functor->executeFunctor();
420 }
421 m_total_time_functor += m_timer->lastActivationTime();
422 info(5) << "TimeFunctor: current=" << m_timer->lastActivationTime()
423 << " total=" << m_total_time_functor;
424 }
425
426 Integer nb_message_finished = 0;
427 {
428 Timer::Sentry ts(m_timer);
429 if (wait_all)
430 nb_message_finished = m_message_list->waitMessages(Parallel::WaitAll);
431 else {
432 if (m_can_process_non_blocking)
433 nb_message_finished = m_message_list->waitMessages(Parallel::WaitSomeNonBlocking);
434 else
435 nb_message_finished = m_message_list->waitMessages(Parallel::WaitSome);
436 //info() << "Nb finished=" << nb_message_finished << " is_block=" << m_can_process_non_blocking;
437 if (nb_message_finished == 0) {
438 m_can_process_non_blocking = false;
439 m_can_process_messages = true;
440 _processFinishTrackingMessage();
441 return;
442 }
443 }
444 }
445 m_total_time_waiting += m_timer->lastActivationTime();
446 info(5) << "TimeWaiting: current=" << m_timer->lastActivationTime()
447 << " total=" << m_total_time_waiting;
448
449 // Save the messages that are currently being processed, as new messages may be added during processing
450 UniqueArray<ISerializeMessage*> current_messages(m_waiting_messages);
451 m_waiting_messages.clear();
452
453 Int64UniqueArray items_to_create_id;
454 Int64UniqueArray items_to_create_cells_id;
455 for (Integer i = 0, is = current_messages.size(); i < is; ++i) {
456 ISerializeMessage* sm = current_messages[i];
457 if (sm->finished()) {
458 if (!sm->isSend()) {
459 _deserializeMessage(sm, items_to_create_id, items_to_create_cells_id, item_group, new_particle_local_ids);
460 ++m_nb_receive_message;
461 }
462 delete sm;
463 }
464 else
465 m_waiting_messages.add(sm);
466 }
467}
468
469/*---------------------------------------------------------------------------*/
470/*---------------------------------------------------------------------------*/
471
472void NonBlockingParticleExchanger::
473_sendPendingMessages()
474{
475 IParallelMng* pm = m_parallel_mng;
476
477 _checkNeedReceiveMessage();
478
479 if (!m_message_list.get())
480 m_message_list = pm->createSerializeMessageListRef();
481
482 {
483 Timer::Sentry ts(m_timer);
484 // Add the messages that are waiting to be processed
485 Integer nb_message = m_pending_messages.size();
486 for (Integer i = 0; i < nb_message; ++i) {
487 m_message_list->addMessage(m_pending_messages[i]);
488 m_waiting_messages.add(m_pending_messages[i]);
489 }
490 m_message_list->processPendingMessages();
491 m_pending_messages.clear();
492 }
493 info(5) << "TimeSendMessages=" << m_timer->lastActivationTime()
494 << " buffersize=" << m_waiting_local_ids.size();
495}
496
497/*---------------------------------------------------------------------------*/
498/*---------------------------------------------------------------------------*/
499
500void NonBlockingParticleExchanger::
501_serializeMessage(ISerializeMessage* sm,
502 Int32ConstArrayView acc_ids,
503 Int64Array& items_to_send_uid,
504 Int64Array& items_to_send_cells_uid)
505{
506 ParticleInfoListView internal_items(m_item_family);
507
508 ISerializer* sbuf = sm->serializer();
509 sbuf->setMode(ISerializer::ModeReserve);
510
511 //for( Integer j=0; j<nb_connected_sub_domain; ++j ){
512 //ConstArrayView<Integer> acc_ids = m_ids_to_send[j];
513 Integer nb_item = acc_ids.size();
514 // Reserve space for the message type
515 sbuf->reserveInteger(1);
516 if (m_want_fast_send_particles) {
517 // Reserve space for the number of particles to be sent
518 sbuf->reserveInt64(1);
519 }
520 // Reserve space for the sender's rank
521 sbuf->reserveInt32(1);
522 // Reserve space for the number of uniqueId()
523 sbuf->reserveInt64(1);
524 // Reserve space for the uniqueId() of the particles
525 sbuf->reserveSpan(eBasicDataType::Int64, nb_item);
526 // Reserve space for the uniqueId() of the cells containing the particles
527 sbuf->reserveSpan(eBasicDataType::Int64, nb_item);
528
529 for (VariableList::Enumerator i_var(m_variables_to_exchange); ++i_var;) {
530 IVariable* var = *i_var;
531 var->serialize(sbuf, acc_ids);
532 }
533
534 // Serialize the data for writing
535 sbuf->allocateBuffer();
536 sbuf->setMode(ISerializer::ModePut);
537
538 sbuf->putInteger(MESSAGE_EXCHANGE);
539 if (m_want_fast_send_particles) {
540 sbuf->putInt64(m_nb_particle_finished_exchange);
541 m_nb_particle_finished_exchange = 0;
542 }
543 sbuf->putInt32(m_rank);
544 sbuf->putInt64(nb_item);
545 items_to_send_uid.resize(nb_item);
546 items_to_send_cells_uid.resize(nb_item);
547
548 for (Integer z = 0; z < nb_item; ++z) {
549 Particle item = internal_items[acc_ids[z]];
550 items_to_send_uid[z] = item.uniqueId();
551 bool has_cell = item.hasCell();
552 items_to_send_cells_uid[z] = (has_cell) ? item.cell().uniqueId() : NULL_ITEM_UNIQUE_ID;
553#ifdef ARCANE_DEBUG_EXCHANGE_ITEMS
554#if 0
555 info() << "Particle BufID=" << acc_ids[z]
556 << " LID=" << item.localId()
557 << " UID=" << items_to_send_uid[z]
558 << " CellIUID=" << items_to_send_cells_uid[z]
559 << " (owner=" << item.cell().owner() << ")";
560#endif
561#endif
562 }
563 sbuf->putSpan(items_to_send_uid);
564 sbuf->putSpan(items_to_send_cells_uid);
565
566 for (VariableList::Enumerator i_var(m_variables_to_exchange); ++i_var;) {
567 IVariable* var = *i_var;
568 var->serialize(sbuf, acc_ids);
569 }
570
571 m_pending_messages.add(sm);
572}
573
574/*---------------------------------------------------------------------------*/
575/*---------------------------------------------------------------------------*/
576
577void NonBlockingParticleExchanger::
578_deserializeMessage(ISerializeMessage* message,
579 Int64Array& items_to_create_unique_id,
580 Int64Array& items_to_create_cells_id,
581 ItemGroup item_group,
582 Int32Array* new_particle_local_ids)
583{
584
585 IMesh* mesh = m_item_family->mesh();
586 ISerializer* sbuf = message->serializer();
587
588 // Set the mode to read the serialized data
589 sbuf->setMode(ISerializer::ModeGet);
590 sbuf->setReadMode(ISerializer::ReadReplace);
591 Int32UniqueArray items_to_create_local_id;
592 Int32UniqueArray cells_lid;
593
594 Integer message_type = sbuf->getInteger();
595 info(4) << "Deserialise message_type=" << (int)message_type;
596 switch (message_type) {
597 case MESSAGE_EXCHANGE: {
598 m_need_general_receive = true;
599 if (m_want_fast_send_particles) {
600 Int64 nb_finished = sbuf->getInt64();
601 m_nb_particle_finished_exchange += nb_finished;
602 }
603 Int32 orig_rank = sbuf->getInt32();
604 Int64 nb_item = sbuf->getInt64();
605 if (m_is_debug)
606 info() << "------------- Receive: rank=" << orig_rank << " particle nb=" << nb_item
607 << " (orig_rank=" << message->destination() << ")";
608
609 //if (nb_item!=0)
610 //info() << "Receiving particules n=" << nb_item;
611 items_to_create_local_id.resize(nb_item);
612 items_to_create_unique_id.resize(nb_item);
613 items_to_create_cells_id.resize(nb_item);
614 sbuf->getSpan(items_to_create_unique_id);
615 sbuf->getSpan(items_to_create_cells_id);
616#ifdef ARCANE_DEBUG_EXCHANGE_ITEMS
617 //info() << "Recv from SID " << sync_infos[i].subDomain() << " N=" << nb_item;
618#if 0
619 for( Integer z=0; z<nb_item; ++z ){
620 info() << "Particle UID=" << items_to_create_unique_id[z]
621 << " CellIUID=" << items_to_create_cells_id[z];
622 }
623#endif
624#endif
625 cells_lid.resize(nb_item);
626 mesh->cellFamily()->itemsUniqueIdToLocalId(cells_lid, items_to_create_cells_id);
627
628 items_to_create_local_id.resize(nb_item);
629 ParticleVectorView particles_view = m_item_family->toParticleFamily()->addParticles(items_to_create_unique_id,
630 cells_lid,
631 items_to_create_local_id);
632 info(5) << "Nb create=" << particles_view.size();
633
634 // Notify the family that the modifications have been completed.
635 // After this method is called, the variables can be used again.
636 m_item_family->endUpdate();
637
638 // Convert the uniqueId() values obtained into localId() values for the particles,
639 // and assign the corresponding cells.
640 ParticleInfoListView internal_items(m_item_family);
641 //ItemInternalList internal_cells(mesh->itemsInternal(IK_Cell));
642 //m_item_family->itemsUniqueIdToLocalId(items_to_create_unique_id, items_to_create_unique_id);
643
644 for (Integer z = 0; z < nb_item; ++z) {
645 Particle item = internal_items[items_to_create_local_id[z]];
646 //item.setCell( internal_cells[cells_lid[z]] );
647 // I am the new owner of these particles (TODO: Do not perform this here).
648 item.mutableItemBase().setOwner(m_rank, m_rank);
649 }
650 if (!item_group.null())
651 item_group.addItems(items_to_create_local_id, false);
652 if (new_particle_local_ids)
653 new_particle_local_ids->addRange(items_to_create_local_id);
654
655 for (VariableCollection::Enumerator i_var(m_variables_to_exchange); ++i_var;) {
656 IVariable* var = *i_var;
657 var->serialize(sbuf, items_to_create_local_id);
658 }
659 } break;
660 case MESSAGE_NB_FINISH_EXCHANGE: {
661 m_need_general_receive = true;
662 // Indicate that it is possible to continue receiving messages, as this message
663 // does not indicate the completion of any process.
664 m_can_process_messages = true;
665 Int64 nb_particle = sbuf->getInt64();
666 Int32 orig_rank = sbuf->getInt32();
667 if (m_is_debug)
668 info() << "MESSAGE_NB_FINISH_EXCHANGE nb=" << nb_particle << " (from rank=" << orig_rank << ")";
669 _addFinishExchangeParticle(nb_particle);
670 } break;
671 case MESSAGE_FINISH_EXCHANGE_STATUS: {
672 m_nb_total_particle_finish_exchange = sbuf->getInt64();
673 m_exchange_finished = (m_nb_total_particle_finish_exchange == m_nb_total_particle);
674 //#ifdef ARCANE_DEBUG_EXCHANGE_ITEMS
675 info() << "** RECEIVING FINISH EXCHANGE " << m_exchange_finished
676 << " finish=" << m_nb_total_particle_finish_exchange
677 << " total=" << m_nb_total_particle;
678 //#endif
679 //if (m_exchange_finished)
680 //warning() << "Exchange finished ! " << m_current_iteration;
681 } break;
682 case MESSAGE_CHANGE_BLOCKING: {
683 m_need_general_receive = true;
684
685 Integer nb_blocking_size = sbuf->getInteger();
686 // It is necessary to ensure that the new blocking_size is less than the current value,
687 // which may happen when multiple messages of this type are received simultaneously.
688 if (nb_blocking_size < m_nb_blocking_size)
689 m_nb_blocking_size = nb_blocking_size;
690 info(4) << "** RECEIVING CHANGE BLOCKING"
691 << " new_blocking_size=" << m_nb_blocking_size;
692 // Since there may still be particles waiting to be sent, they need to be sent now.
693 if (m_waiting_local_ids.size() > 0)
694 _generateSendItemsMessages(Int32ConstArrayView(), Int32ConstArrayView());
695 } break;
696 }
697}
698
699/*---------------------------------------------------------------------------*/
700/*---------------------------------------------------------------------------*/
701
702void NonBlockingParticleExchanger::
703reset()
704{
705 if (!m_waiting_messages.empty())
706 ARCANE_FATAL("reset() waiting parallel requests");
707 _clearMessages();
708}
709
710/*---------------------------------------------------------------------------*/
711/*---------------------------------------------------------------------------*/
712
713void NonBlockingParticleExchanger::
714_processFinishTrackingMessage()
715{
716 // If the processor is m_master_proc:
717 // - Receive the number of messages that have been completed from other processors.
718 // - Calculate the total number of completed messages.
719 // - Send this information to all processors.
720 // If the processor is not m_master_proc:
721 // - Send the value of nb_finish_tracking_particle to m_master_proc.
722 // - Receive the total number of messages that have been completed from m_master_proc.
723 if (m_rank == m_master_proc) {
724 _addFinishExchangeParticle(m_nb_particle_finished_exchange);
725 }
726 else {
727 // Send the number of particles to m_master_proc.
728 if (m_nb_particle_finished_exchange != 0) {
729 info(4) << "Send to master proc (" << m_master_proc << ") nb_finish=" << m_nb_particle_finished_exchange;
730 SerializeMessage* sm = new SerializeMessage(m_rank, m_master_proc, ISerializeMessage::MT_Send);
731 ISerializer* sbuf = sm->serializer();
732 sbuf->setMode(ISerializer::ModeReserve);
733 sbuf->reserveInteger(1);
734 sbuf->reserveInt64(1);
735 sbuf->reserveInt32(1);
736 sbuf->allocateBuffer();
737 sbuf->setMode(ISerializer::ModePut);
738 sbuf->putInteger(MESSAGE_NB_FINISH_EXCHANGE);
739 sbuf->putInt64(m_nb_particle_finished_exchange);
740 sbuf->putInt32(m_rank);
741 m_pending_messages.add(sm);
742 }
743 }
744 m_nb_particle_finished_exchange = 0;
745}
746
747/*---------------------------------------------------------------------------*/
748/*---------------------------------------------------------------------------*/
749
750void NonBlockingParticleExchanger::
751_sendFinishExchangeParticle()
752{
753 Int32 nb_rank = m_parallel_mng->commSize();
754 if (m_rank != m_master_proc || m_end_message_sended)
755 return;
756 m_end_message_sended = true;
757 info(4) << " ** ** SEND FINISH EXCHANGE PARTICLE2";
758 for (Integer i = 0; i < nb_rank; ++i) {
759 if (i == m_master_proc)
760 continue;
761 SerializeMessage* sm = new SerializeMessage(m_rank, i, ISerializeMessage::MT_Send);
762 ISerializer* sbuf = sm->serializer();
763 sbuf->setMode(ISerializer::ModeReserve);
764 sbuf->reserveInteger(1);
765 sbuf->reserveInt64(1);
766 sbuf->allocateBuffer();
767 sbuf->setMode(ISerializer::ModePut);
768 sbuf->putInteger(MESSAGE_FINISH_EXCHANGE_STATUS);
769 sbuf->putInt64(m_nb_total_particle_finish_exchange);
770 m_pending_messages.add(sm);
771 }
772}
773
774/*---------------------------------------------------------------------------*/
775/*---------------------------------------------------------------------------*/
776
777void NonBlockingParticleExchanger::
778_addFinishExchangeParticle(Int64 nb_particle_finish_exchange)
779{
780 m_nb_total_particle_finish_exchange += nb_particle_finish_exchange;
781 Int32 nb_rank = m_parallel_mng->commSize();
782 Int64 nb_rank_as_int64 = nb_rank;
783 //#ifdef ARCANE_DEBUG_EXCHANGE_ITEMS
784 info(4) << "** RECEIVING FINISH EXCHANGE n=" << nb_particle_finish_exchange
785 << " totalfinish=" << m_nb_total_particle_finish_exchange
786 << " total=" << m_nb_total_particle;
787 //#endif
788 Int64 remaining_particle = m_nb_total_particle - m_nb_total_particle_finish_exchange;
789 if (remaining_particle == 0) {
790 m_exchange_finished = true;
791 m_need_general_receive = false;
792 info() << "** ** FINISH TRACKING NB_RECV=" << m_nb_receive_message
793 << " (Date=" << platform::getCurrentDateTime() << ")";
794 _sendFinishExchangeParticle();
795 }
796 else if (remaining_particle < (m_nb_blocking_size * nb_rank_as_int64)) {
797 //Integer nb_rank = subDomain()->nbSubDomain();
798 //m_nb_blocking_size /= 100;
799 m_nb_blocking_size = 0;
800 warning() << "** ** CHANGE BLOCKING NEW_SIZE " << m_nb_blocking_size
801 << " REMAING_PARTICLE " << remaining_particle
802 << " (Date=" << platform::getCurrentDateTime() << ")";
803
804 // Since there may still be particles waiting to be sent, they need to be sent now.
805 if (m_waiting_local_ids.size() > 0)
806 _generateSendItemsMessages(Int32ConstArrayView(), Int32ConstArrayView());
807 for (Int32 i = 0; i < nb_rank; ++i) {
808 if (i == m_master_proc)
809 continue;
810 SerializeMessage* sm = new SerializeMessage(m_rank, i, ISerializeMessage::MT_Send);
811 ISerializer* sbuf = sm->serializer();
812 sbuf->setMode(ISerializer::ModeReserve);
813 sbuf->reserveInteger(1);
814 sbuf->reserveInteger(1);
815 sbuf->allocateBuffer();
816 sbuf->setMode(ISerializer::ModePut);
817 sbuf->putInteger(MESSAGE_CHANGE_BLOCKING);
818 sbuf->putInteger(m_nb_blocking_size);
819 m_pending_messages.add(sm);
820 }
821 }
822}
823
824/*---------------------------------------------------------------------------*/
825/*---------------------------------------------------------------------------*/
826
827void NonBlockingParticleExchanger::
828_checkInitialized()
829{
830 if (!m_item_family)
831 ARCANE_FATAL("method initialized() not called");
832 _clearMessages();
833}
834
835/*---------------------------------------------------------------------------*/
836/*---------------------------------------------------------------------------*/
837
838ARCANE_REGISTER_CASE_OPTIONS_NOAXL_FACTORY(NonBlockingParticleExchanger, IParticleExchanger,
839 NonBlockingParticleExchanger);
840
841/*---------------------------------------------------------------------------*/
842/*---------------------------------------------------------------------------*/
843
844} // End namespace Arcane::mesh
845
846/*---------------------------------------------------------------------------*/
847/*---------------------------------------------------------------------------*/
#define ARCANE_FATAL(...)
Macro throwing a FatalErrorException.
#define ARCANE_REGISTER_CASE_OPTIONS_NOAXL_FACTORY(aclass, ainterface, aname)
Registers a factory service for the class aclass.
void clear()
Removes the elements from the array.
constexpr Integer size() const noexcept
Number of elements in the array.
Interface of an entity family.
Definition IItemFamily.h:83
Interface of the parallelism manager for a subdomain.
virtual char reduce(eReduceType rt, char v)=0
Performs a reduction of type rt on the real v and returns the value.
Mesh entity group.
Definition ItemGroup.h:51
Int32 size() const
Number of elements in the vector.
TraceMessage info() const
Flow for an information message.
VariableList m_variables_to_exchange
List of variables to exchange.
Integer m_nb_blocking_size
Number of remaining particles before switching to blocking mode.
ItemVectorViewT< Particle > ParticleVectorView
View over a vector of particles.
Definition ItemTypes.h:310
String getCurrentDateTime()
Current date and time in ISO 8601 format.
UniqueArray< Int64 > Int64UniqueArray
Dynamic 1D array of 64-bit integers.
Definition UtilsTypes.h:339
std::int64_t Int64
Signed integer type of 64 bits.
Int32 Integer
Type representing an integer.
ConstArrayView< Int32 > Int32ConstArrayView
C equivalent of a 1D array of 32-bit integers.
Definition UtilsTypes.h:482
UniqueArray< Int32 > Int32UniqueArray
Dynamic 1D array of 32-bit integers.
Definition UtilsTypes.h:341
Array< Int32 > Int32Array
Dynamic one-dimensional array of 32-bit integers.
Definition UtilsTypes.h:127