Arcane  4.1.12.0
Developer documentation
Loading...
Searching...
No Matches
AsyncParticleExchanger.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2026 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* AsyncParticleExchanger.cc (C) 2000-2025 */
9/* */
10/* Asynchronous particle exchanger. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/mesh/AsyncParticleExchanger.h"
15#include "arcane/core/IParallelNonBlockingCollective.h"
16
17#include "arcane/core/internal/SerializeMessage.h"
18
19/*---------------------------------------------------------------------------*/
20/*---------------------------------------------------------------------------*/
21
22namespace Arcane::mesh
23{
24using namespace Arcane::MessagePassing;
25
26/*---------------------------------------------------------------------------*/
27/*---------------------------------------------------------------------------*/
28
29AsyncParticleExchanger::
30AsyncParticleExchanger(const ServiceBuildInfo& sbi)
31: BasicService(sbi)
32, m_bpe(sbi)
33{
34}
35
36/*---------------------------------------------------------------------------*/
37/*---------------------------------------------------------------------------*/
38
39AsyncParticleExchanger::
40~AsyncParticleExchanger()
41{
42}
43
44/*---------------------------------------------------------------------------*/
45/*---------------------------------------------------------------------------*/
46
47/*---------------------------------------------------------------------------*/
48/*---------------------------------------------------------------------------*/
49
50void AsyncParticleExchanger::
51build()
52{
53 m_bpe.build();
54 // By default sets the verbosity level to 0 to avoid too many messages
55 // during asynchronous phases.
56 m_bpe.setVerboseLevel(0);
57}
58
59/*---------------------------------------------------------------------------*/
60/*---------------------------------------------------------------------------*/
61
62void AsyncParticleExchanger::
63initialize(IItemFamily* item_family)
64{
65 m_bpe.initialize(item_family);
66 IParallelMng* pm = m_bpe.m_parallel_mng;
67 if (pm->isParallel()) {
69 if (!pnbc)
71 "AsyncParticleExchanger is not supported because NonBlocking"
72 " collectives are not available");
73 }
74}
75
76/*---------------------------------------------------------------------------*/
77/*---------------------------------------------------------------------------*/
78
79void AsyncParticleExchanger::
80beginNewExchange(Integer nb_particule)
81{
82 info() << "AsyncParticleExchanger is used. It also use BasicParticleExchanger functionnalities";
83 m_bpe.beginNewExchange(nb_particule);
84
85 m_nb_particle_send_before_reduction = 0;
86 m_nb_particle_send_before_reduction_tmp = 0;
87 m_sum_of_nb_particle_sent = 1;
88}
89
90/*---------------------------------------------------------------------------*/
91/*---------------------------------------------------------------------------*/
92
93bool AsyncParticleExchanger::
94exchangeItems(Integer nb_particle_finish_exchange,
95 Int32ConstArrayView local_ids,
96 Int32ConstArrayView sub_domains_to_send,
97 ItemGroup item_group,
98 IFunctor* functor)
99{
100 return m_bpe.exchangeItems(nb_particle_finish_exchange, local_ids,
101 sub_domains_to_send, item_group, functor);
102}
103
104/*---------------------------------------------------------------------------*/
105/*---------------------------------------------------------------------------*/
106
107bool AsyncParticleExchanger::
108exchangeItems(Integer nb_particle_finish_exchange,
109 Int32ConstArrayView local_ids,
110 Int32ConstArrayView sub_domains_to_send,
111 Int32Array* new_particle_local_ids,
112 IFunctor* functor)
113{
114 return m_bpe.exchangeItems(nb_particle_finish_exchange, local_ids,
115 sub_domains_to_send, new_particle_local_ids, functor);
116}
117
118/*---------------------------------------------------------------------------*/
119/*---------------------------------------------------------------------------*/
120
121void AsyncParticleExchanger::
122sendItems(Integer nb_particle_finish_exchange,
123 Int32ConstArrayView local_ids,
124 Int32ConstArrayView sub_domains_to_send)
125{
126 m_bpe.sendItems(nb_particle_finish_exchange, local_ids, sub_domains_to_send);
127}
128
129/*---------------------------------------------------------------------------*/
130/*---------------------------------------------------------------------------*/
131
132bool AsyncParticleExchanger::
133waitMessages(Integer nb_pending_particles,
134 Int32Array* new_particle_local_ids,
135 IFunctor* functor)
136{
137 return m_bpe.waitMessages(nb_pending_particles, new_particle_local_ids, functor);
138}
139
140/*---------------------------------------------------------------------------*/
141/*---------------------------------------------------------------------------*/
142
143void AsyncParticleExchanger::
144addNewParticles(Integer nb_particle)
145{
146 m_bpe.addNewParticles(nb_particle);
147}
148
149/*---------------------------------------------------------------------------*/
150/*---------------------------------------------------------------------------*/
151
152IItemFamily* AsyncParticleExchanger::
153itemFamily()
154{
155 return m_bpe.itemFamily();
156}
157
158/*---------------------------------------------------------------------------*/
159/*---------------------------------------------------------------------------*/
160
161void AsyncParticleExchanger::
162setVerboseLevel(Integer level)
163{
164 m_bpe.setVerboseLevel(level);
165}
166
167/*---------------------------------------------------------------------------*/
168/*---------------------------------------------------------------------------*/
169
170Integer AsyncParticleExchanger::
171verboseLevel() const
172{
173 return m_bpe.verboseLevel();
174}
175
176/*---------------------------------------------------------------------------*/
177/*---------------------------------------------------------------------------*/
178
179IAsyncParticleExchanger* AsyncParticleExchanger::
180asyncParticleExchanger()
181{
182 return this;
183}
184
185/*---------------------------------------------------------------------------*/
186/*---------------------------------------------------------------------------*/
187
188bool AsyncParticleExchanger::
189exchangeItemsAsync(Integer nb_particle_finish_exchange,
190 Int32ConstArrayView local_ids,
191 Int32ConstArrayView sub_domains_to_send,
192 Int32Array* new_particle_local_ids,
193 IFunctor* functor,
194 bool has_local_flying_particles)
195{
196 ARCANE_UNUSED(nb_particle_finish_exchange);
197 ARCANE_UNUSED(functor);
198
199 bool is_finished = false;
200 ++m_bpe.m_nb_loop;
201
202 // Generates all Isend and Imrecv matched with Improbe
203 m_bpe.m_nb_particle_send = local_ids.size();
204 {
205 Timer::Sentry ts(m_bpe.m_timer);
206 _generateSendItemsAsync(local_ids, sub_domains_to_send);
207 }
208 if (m_bpe.m_verbose_level >= 1)
209 info() << "ASE_BeginLoop loop=" << m_bpe.m_nb_loop;
210 m_bpe._sendPendingMessages();
211
212 if (new_particle_local_ids)
213 new_particle_local_ids->clear();
214
215 bool has_new_particle = _waitSomeMessages(ItemGroup(), new_particle_local_ids);
216 if (has_new_particle)
217 has_local_flying_particles = true;
218
219 //----------------------------------------
220 // Here is the core of the stopping condition algorithm when using AsyncParticleExchanger
221 //
222 //If chunk size == 0 && no req(red) in flight
223 //If (Q > 0) with Q being the number of particles in flight (result of the Iallreduce)
224 // Iallreduce (P, Q, req(red));
225 // P=0; with P being the number of particles sent since the last Iallreduce
226 //Otherwise
227 // return is_finished = true
228 //
229
230 IParallelMng* pm = m_bpe.m_parallel_mng;
231 UniqueArray<Integer> isIallReduceRunning = pm->testSomeRequests(m_reduce_requests);
232
233 //If the request matched, we clear the request array
234 if (isIallReduceRunning.size() != 0) {
235 m_reduce_requests.clear();
236 if (m_bpe.m_verbose_level >= 1)
237 info() << "PSM_IAllReduceFinished loop=" << m_bpe.m_nb_loop
238 << " total=" << m_sum_of_nb_particle_sent;
239 }
240
241 //Here, we test if we have particles to process locally
242 //If there are no Iallreduce requests in flight
243 //and no requests to send or receive in flight
244 if ((!has_local_flying_particles) && (m_reduce_requests.size() == 0) && (m_bpe.m_waiting_messages.size() == 0) && (m_bpe.m_pending_messages.size() == 0)) {
245 if (m_sum_of_nb_particle_sent > 0) {
246 //Perform MPI_Iallreduce
248 m_nb_particle_send_before_reduction = m_nb_particle_send_before_reduction_tmp;
249 if (m_bpe.m_verbose_level >= 1)
250 info() << "PSM_DoIAllReduce loop=" << m_bpe.m_nb_loop
251 << " n=" << m_nb_particle_send_before_reduction
252 << " nb_to_send=" << local_ids.size();
253 m_reduce_requests.add(pnbc->allReduce(Parallel::ReduceSum,
254 ConstArrayView<Integer>(1, &m_nb_particle_send_before_reduction),
255 ArrayView<Integer>(1, &m_sum_of_nb_particle_sent)));
256 m_nb_particle_send_before_reduction_tmp = 0;
257 }
258 else {
259 is_finished = true; // is_finished = true, there are no more particles to process globally
260 }
261 }
262 return is_finished;
263}
264
265/*---------------------------------------------------------------------------*/
266/*---------------------------------------------------------------------------*/
267
268void AsyncParticleExchanger::
269_generateSendItemsAsync(Int32ConstArrayView local_ids, Int32ConstArrayView sub_domains_to_send)
270{
271 Timer::Phase tphase(subDomain(), TP_Communication);
272
273 IMesh* mesh = m_bpe.m_item_family->mesh();
274
275 Int32UniqueArray communicating_sub_domains;
276 mesh->cellFamily()->getCommunicatingSubDomains(communicating_sub_domains);
277
278 Integer nb_connected_sub_domain = communicating_sub_domains.size();
279
280 UniqueArray<SharedArray<Int32>> ids_to_send(nb_connected_sub_domain);
281 // Info for each connected sub-domain
282 m_bpe.m_accumulate_infos.clear();
283 m_bpe.m_accumulate_infos.resize(nb_connected_sub_domain);
284
285 m_bpe._addItemsToSend(local_ids, sub_domains_to_send, communicating_sub_domains, ids_to_send);
286
287 Int64UniqueArray items_to_send_uid;
288 Int64UniqueArray items_to_send_cells_uid; // Only for particles;
289
290 IParallelMng* pm = m_bpe.m_parallel_mng;
291
292 //-------------------------------
293 // Handling particle sends
294 //
295 // [HT] In asynchronous mode, we must only send if we have particles
296 // and receptions will be done with MPI_Improbe
297 for (Integer j = 0; j < nb_connected_sub_domain; ++j) {
298 if (ids_to_send[j].size() != 0) {
299 auto* sm = new SerializeMessage(pm->commRank(), communicating_sub_domains[j],
300 ISerializeMessage::MT_Send);
301 m_bpe.m_accumulate_infos[j] = sm;
302 m_bpe._serializeMessage(sm, ids_to_send[j], items_to_send_uid, items_to_send_cells_uid);
303 m_bpe.m_pending_messages.add(sm);
304 m_nb_particle_send_before_reduction_tmp += ids_to_send[j].size();
305 }
306 }
307
308 //-------------------------------
309 // Handling particle receives
310 //
311 // [HT] In asynchronous mode, receptions are done with MPI_Improbe and MPI_Imrecv
312 for (Integer j = 0; j < nb_connected_sub_domain; ++j) {
313
314 MessageTag tag(Arcane::MessagePassing::internal::BasicSerializeMessage::DEFAULT_SERIALIZE_TAG_VALUE);
315 MessageRank rank(communicating_sub_domains[j]);
316 PointToPointMessageInfo message(rank, tag);
317 message.setBlocking(false);
318 MessageId mid = pm->probe(message);
319
320 if (mid.isValid()) {
321 SerializeMessage* recv_sm = new SerializeMessage(m_bpe.subDomain()->subDomainId(), mid);
322 m_bpe.m_pending_messages.add(recv_sm);
323 }
324 }
325
326 m_bpe.m_accumulate_infos.clear();
327 // Deletes the entities that were just sent
328 m_bpe.m_item_family->toParticleFamily()->removeParticles(local_ids);
329 m_bpe.m_item_family->endUpdate();
330}
331
332/*---------------------------------------------------------------------------*/
333/*---------------------------------------------------------------------------*/
334
335bool AsyncParticleExchanger::
336_waitSomeMessages(ItemGroup item_group, Int32Array* new_particle_local_ids)
337{
338 {
339 Timer::Sentry ts(m_bpe.m_timer);
340 m_bpe.m_message_list->waitMessages(Parallel::WaitSomeNonBlocking);
341 }
342 m_bpe.m_total_time_waiting += m_bpe.m_timer->lastActivationTime();
343
344 // Save the currently processed communications because processing
345 // might add new ones
346 UniqueArray<ISerializeMessage*> current_messages(m_bpe.m_waiting_messages);
347
348 m_bpe.m_waiting_messages.clear();
349
350 Int64UniqueArray items_to_create_unique_id;
351 Int64UniqueArray items_to_create_cells_unique_id;
352 Int32UniqueArray items_to_create_local_id;
353 Int32UniqueArray items_to_create_cells_local_id;
354 bool has_new_particle = false;
355 for (Integer i = 0, is = current_messages.size(); i < is; ++i) {
356 ISerializeMessage* sm = current_messages[i];
357 if (sm->finished()) {
358 if (!sm->isSend()) { //If the msg is a recv
359 m_bpe._deserializeMessage(sm, items_to_create_unique_id, items_to_create_cells_unique_id,
360 items_to_create_local_id, items_to_create_cells_local_id,
361 item_group, new_particle_local_ids);
362 // Indicates that particles were received and therefore it should be stated
363 // that has_local_flying_particle is true
364 if (!items_to_create_unique_id.empty())
365 has_new_particle = true;
366 }
367 delete sm;
368 }
369 else {
370 m_bpe.m_waiting_messages.add(sm);
371 }
372 }
373 return has_new_particle;
374}
375/*---------------------------------------------------------------------------*/
376/*---------------------------------------------------------------------------*/
377
378ARCANE_REGISTER_CASE_OPTIONS_NOAXL_FACTORY(AsyncParticleExchanger,
380 AsyncParticleExchanger);
381ARCANE_REGISTER_SUB_DOMAIN_FACTORY(AsyncParticleExchanger,
383 AsyncParticleExchanger);
384
385/*---------------------------------------------------------------------------*/
386/*---------------------------------------------------------------------------*/
387
388} // End namespace Arcane::mesh
389
390/*---------------------------------------------------------------------------*/
391/*---------------------------------------------------------------------------*/
#define ARCANE_THROW(exception_class,...)
Macro for throwing an exception with formatting.
#define ARCANE_REGISTER_SUB_DOMAIN_FACTORY(aclass, ainterface, aname)
Registers a factory service for the class aclass.
#define ARCANE_REGISTER_CASE_OPTIONS_NOAXL_FACTORY(aclass, ainterface, aname)
Registers a factory service for the class aclass.
Integer size() const
Number of elements in the vector.
bool empty() const
Capacity (number of allocated elements) of the vector.
Modifiable view of an array of type T.
void clear()
Removes the elements from the array.
Constant view of an array of type T.
constexpr Integer size() const noexcept
Number of elements in the array.
Interface of an asynchronous particle exchanger.
Interface of an entity family.
Definition IItemFamily.h:83
Interface of the parallelism manager for a subdomain.
virtual Int32 commRank() const =0
Rank of this instance in the communicator.
virtual MessageId probe(const PointToPointMessageInfo &message)=0
Probes if messages are available.
virtual IParallelNonBlockingCollective * nonBlockingCollective() const =0
Interface for non-blocking collective operations.
virtual UniqueArray< Integer > testSomeRequests(ArrayView< Request > rvalues)=0
Tests if one of the rvalues requests is complete.
virtual bool isParallel() const =0
Returns true if the execution is parallel.
Interface for non-blocking collective parallel operations.
virtual Request allReduce(eReduceType rt, ConstArrayView< char > send_buf, ArrayView< char > recv_buf)=0
Performs the reduction of type rt on the array send_buf and stores the result in recv_buf.
Interface of a particle exchanger.
Mesh entity group.
Definition ItemGroup.h:51
virtual bool finished() const =0
true if the message is finished
virtual bool isSend() const =0
true if it should send, false if it should receive
Information for sending/receiving a point-to-point message.
Message using a SerializeBuffer.
Positions the phase of the currently executing action.
Definition Timer.h:142
Sentinel for the timer. The sentinel associated with a timer allows it to be triggered upon its const...
Definition Timer.h:90
TraceMessage info() const
Flow for an information message.
1D data vector with value semantics (STL style).
UniqueArray< Int64 > Int64UniqueArray
Dynamic 1D array of 64-bit integers.
Definition UtilsTypes.h:339
Int32 Integer
Type representing an integer.
ConstArrayView< Int32 > Int32ConstArrayView
C equivalent of a 1D array of 32-bit integers.
Definition UtilsTypes.h:482
UniqueArray< Int32 > Int32UniqueArray
Dynamic 1D array of 32-bit integers.
Definition UtilsTypes.h:341
Array< Int32 > Int32Array
Dynamic one-dimensional array of 32-bit integers.
Definition UtilsTypes.h:127