Arcane  v3.16.0.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
AsyncParticleExchanger.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* AsyncParticleExchanger.cc (C) 2000-2025 */
9/* */
10/* Echangeur de particules asynchrone. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/mesh/AsyncParticleExchanger.h"
15#include "arcane/core/IParallelNonBlockingCollective.h"
16
17#include "arcane/core/internal/SerializeMessage.h"
18
19/*---------------------------------------------------------------------------*/
20/*---------------------------------------------------------------------------*/
21
22namespace Arcane::mesh
23{
24using namespace Arcane::MessagePassing;
25
26/*---------------------------------------------------------------------------*/
27/*---------------------------------------------------------------------------*/
28
29AsyncParticleExchanger::
30AsyncParticleExchanger(const ServiceBuildInfo& sbi)
31: BasicService(sbi)
32, m_bpe(sbi)
33{
34}
35
36/*---------------------------------------------------------------------------*/
37/*---------------------------------------------------------------------------*/
38
39AsyncParticleExchanger::
40~AsyncParticleExchanger()
41{
42}
43
44/*---------------------------------------------------------------------------*/
45/*---------------------------------------------------------------------------*/
46
47/*---------------------------------------------------------------------------*/
48/*---------------------------------------------------------------------------*/
49
50void AsyncParticleExchanger::
51build()
52{
53 m_bpe.build();
54 // Par défaut met à 0 le niveau de verbosité pour éviter trop de messages
55 // lors des phases asynchrones.
56 m_bpe.setVerboseLevel(0);
57}
58
59/*---------------------------------------------------------------------------*/
60/*---------------------------------------------------------------------------*/
61
62void AsyncParticleExchanger::
63initialize(IItemFamily* item_family)
64{
65 m_bpe.initialize(item_family);
66 IParallelMng* pm = m_bpe.m_parallel_mng;
67 if (pm->isParallel()){
69 if (!pnbc)
71 "AsyncParticleExchanger is not supported because NonBlocking"
72 " collectives are not available");
73 }
74}
75
76/*---------------------------------------------------------------------------*/
77/*---------------------------------------------------------------------------*/
78
79void AsyncParticleExchanger::
80beginNewExchange(Integer nb_particule)
81{
82 info() << "AsyncParticleExchanger is used. It also use BasicParticleExchanger functionnalities";
83 m_bpe.beginNewExchange(nb_particule);
84
85 m_nb_particle_send_before_reduction = 0;
86 m_nb_particle_send_before_reduction_tmp = 0;
87 m_sum_of_nb_particle_sent = 1;
88}
89
90/*---------------------------------------------------------------------------*/
91/*---------------------------------------------------------------------------*/
92
93bool AsyncParticleExchanger::
94exchangeItems(Integer nb_particle_finish_exchange,
95 Int32ConstArrayView local_ids,
96 Int32ConstArrayView sub_domains_to_send,
97 ItemGroup item_group,
98 IFunctor* functor)
99{
100 return m_bpe.exchangeItems(nb_particle_finish_exchange, local_ids,
101 sub_domains_to_send, item_group, functor);
102}
103
104/*---------------------------------------------------------------------------*/
105/*---------------------------------------------------------------------------*/
106
107bool AsyncParticleExchanger::
108exchangeItems(Integer nb_particle_finish_exchange,
109 Int32ConstArrayView local_ids,
110 Int32ConstArrayView sub_domains_to_send,
111 Int32Array* new_particle_local_ids,
112 IFunctor* functor)
113{
114 return m_bpe.exchangeItems(nb_particle_finish_exchange, local_ids,
115 sub_domains_to_send, new_particle_local_ids, functor);
116}
117
118/*---------------------------------------------------------------------------*/
119/*---------------------------------------------------------------------------*/
120
121void AsyncParticleExchanger::
122sendItems(Integer nb_particle_finish_exchange,
123 Int32ConstArrayView local_ids,
124 Int32ConstArrayView sub_domains_to_send)
125{
126 m_bpe.sendItems(nb_particle_finish_exchange, local_ids, sub_domains_to_send);
127}
128
129/*---------------------------------------------------------------------------*/
130/*---------------------------------------------------------------------------*/
131
132bool AsyncParticleExchanger::
133waitMessages(Integer nb_pending_particles,
134 Int32Array* new_particle_local_ids,
135 IFunctor* functor)
136{
137 return m_bpe.waitMessages(nb_pending_particles, new_particle_local_ids, functor);
138}
139
140/*---------------------------------------------------------------------------*/
141/*---------------------------------------------------------------------------*/
142
143void AsyncParticleExchanger::
144addNewParticles(Integer nb_particle)
145{
146 m_bpe.addNewParticles(nb_particle);
147}
148
149/*---------------------------------------------------------------------------*/
150/*---------------------------------------------------------------------------*/
151
152IItemFamily* AsyncParticleExchanger::
153itemFamily()
154{
155 return m_bpe.itemFamily();
156}
157
158/*---------------------------------------------------------------------------*/
159/*---------------------------------------------------------------------------*/
160
161void AsyncParticleExchanger::
162setVerboseLevel(Integer level)
163{
164 m_bpe.setVerboseLevel(level);
165}
166
167/*---------------------------------------------------------------------------*/
168/*---------------------------------------------------------------------------*/
169
170Integer AsyncParticleExchanger::
171verboseLevel() const
172{
173 return m_bpe.verboseLevel();
174}
175
176/*---------------------------------------------------------------------------*/
177/*---------------------------------------------------------------------------*/
178
179IAsyncParticleExchanger* AsyncParticleExchanger::
180asyncParticleExchanger()
181{
182 return this;
183}
184
185/*---------------------------------------------------------------------------*/
186/*---------------------------------------------------------------------------*/
187
188bool AsyncParticleExchanger::
189exchangeItemsAsync(Integer nb_particle_finish_exchange,
190 Int32ConstArrayView local_ids,
191 Int32ConstArrayView sub_domains_to_send,
192 Int32Array* new_particle_local_ids,
193 IFunctor* functor,
194 bool has_local_flying_particles)
195{
196 ARCANE_UNUSED(nb_particle_finish_exchange);
197 ARCANE_UNUSED(functor);
198
199 bool is_finished = false;
200 ++m_bpe.m_nb_loop;
201
202 //Génère tous les Isend et les Imrecv matchés avec Improbe
203 m_bpe.m_nb_particle_send = local_ids.size();
204 {
205 Timer::Sentry ts(m_bpe.m_timer);
206 _generateSendItemsAsync(local_ids, sub_domains_to_send);
207 }
208 if (m_bpe.m_verbose_level>=1)
209 info() << "ASE_BeginLoop loop=" << m_bpe.m_nb_loop;
210 m_bpe._sendPendingMessages();
211
212 if (new_particle_local_ids)
213 new_particle_local_ids->clear();
214
215 bool has_new_particle = _waitSomeMessages(ItemGroup(), new_particle_local_ids);
216 if (has_new_particle)
217 has_local_flying_particles = true;
218
219 //----------------------------------------
220 //Ici on a le coeur de l'algo de condition d'arrêt lors de l'utilisation de AsyncParticleExchanger
221 //
222 //Si taille de chunk == 0 && pas de req(red) en vol
223 //Si (Q > 0) avec Q le nombre de particule en vol (résultat du Iallreduce)
224 // Iallreduce (P, Q, req(red));
225 // P=0; avec P le nombre de particule envoyé depuis le dernier Iallreduce
226 //Sinon
227 // retourner is_finished = true
228 //
229
230 IParallelMng* pm = m_bpe.m_parallel_mng;
231 UniqueArray<Integer> isIallReduceRunning = pm->testSomeRequests(m_reduce_requests);
232
233 //Si la requête a matché, on clear le tableau de requête
234 if (isIallReduceRunning.size() != 0){
235 m_reduce_requests.clear();
236 if (m_bpe.m_verbose_level>=1)
237 info() << "PSM_IAllReduceFinished loop=" << m_bpe.m_nb_loop
238 << " total=" << m_sum_of_nb_particle_sent;
239 }
240
241 //Ici, on teste si on a des particules à traiter en local
242 //Qu'il n'y a pas de requête Iallreduce en vol
243 //et qu'il n'y a pas de requête à envoyer ou recevoir en vol
244 if ((!has_local_flying_particles) && (m_reduce_requests.size() == 0) && (m_bpe.m_waiting_messages.size() == 0) && (m_bpe.m_pending_messages.size()==0)) {
245 if (m_sum_of_nb_particle_sent > 0) {
246 //Faire MPI_Iallreduce
248 m_nb_particle_send_before_reduction = m_nb_particle_send_before_reduction_tmp;
249 if (m_bpe.m_verbose_level>=1)
250 info() << "PSM_DoIAllReduce loop=" << m_bpe.m_nb_loop
251 << " n=" << m_nb_particle_send_before_reduction
252 << " nb_to_send=" << local_ids.size();
253 m_reduce_requests.add(pnbc->allReduce(Parallel::ReduceSum,
254 ConstArrayView<Integer>(1, &m_nb_particle_send_before_reduction),
255 ArrayView<Integer>(1, &m_sum_of_nb_particle_sent)));
256 m_nb_particle_send_before_reduction_tmp = 0;
257 }
258 else {
259 is_finished = true; // is_finished = true, il n'y a plus de particules à traiter globalement
260 }
261 }
262 return is_finished;
263}
264
265/*---------------------------------------------------------------------------*/
266/*---------------------------------------------------------------------------*/
267
268void AsyncParticleExchanger::
269_generateSendItemsAsync(Int32ConstArrayView local_ids, Int32ConstArrayView sub_domains_to_send)
270{
271 Timer::Phase tphase(subDomain(), TP_Communication);
272
273 IMesh* mesh = m_bpe.m_item_family->mesh();
274
275 Int32UniqueArray communicating_sub_domains;
276 mesh->cellFamily()->getCommunicatingSubDomains(communicating_sub_domains);
277
278 Integer nb_connected_sub_domain = communicating_sub_domains.size();
279
280 UniqueArray<SharedArray<Int32>> ids_to_send(nb_connected_sub_domain);
281 // Infos pour chaque sous-domaine connecté
282 m_bpe.m_accumulate_infos.clear();
283 m_bpe.m_accumulate_infos.resize(nb_connected_sub_domain);
284
285 m_bpe._addItemsToSend(local_ids, sub_domains_to_send, communicating_sub_domains, ids_to_send);
286
287 Int64UniqueArray items_to_send_uid;
288 Int64UniqueArray items_to_send_cells_uid; // Uniquement pour les particules;
289
290 IParallelMng* pm = m_bpe.m_parallel_mng;
291
292 //-------------------------------
293 // Gestion des envoies de particules
294 //
295 // [HT] En mode asynchrone, nous devons envoyer que si nous avons des particules
296 // et les réceptions se feront avec des MPI_Improbe
297 for (Integer j = 0; j < nb_connected_sub_domain; ++j) {
298 if (ids_to_send[j].size() != 0) {
299 auto* sm = new SerializeMessage(pm->commRank(), communicating_sub_domains[j],
300 ISerializeMessage::MT_Send);
301 m_bpe.m_accumulate_infos[j] = sm;
302 m_bpe._serializeMessage(sm, ids_to_send[j], items_to_send_uid, items_to_send_cells_uid);
303 m_bpe.m_pending_messages.add(sm);
304 m_nb_particle_send_before_reduction_tmp += ids_to_send[j].size();
305 }
306 }
307
308 //-------------------------------
309 // Gestion des réceptions de particules
310 //
311 // [HT] En mode asynchrone, les réceptions se font avec des MPI_Improbe et MPI_Imrecv
312 for (Integer j = 0; j < nb_connected_sub_domain; ++j) {
313
314 MessageTag tag(Arcane::MessagePassing::internal::BasicSerializeMessage::DEFAULT_SERIALIZE_TAG_VALUE);
315 MessageRank rank(communicating_sub_domains[j]);
316 PointToPointMessageInfo message(rank,tag);
317 message.setBlocking(false);
318 MessageId mid = pm->probe(message);
319
320 if (mid.isValid()) {
321 SerializeMessage* recv_sm = new SerializeMessage(m_bpe.subDomain()->subDomainId(), mid);
322 m_bpe.m_pending_messages.add(recv_sm);
323 }
324 }
325
326 m_bpe.m_accumulate_infos.clear();
327 // Détruit les entités qui viennent d'être envoyées
328 m_bpe.m_item_family->toParticleFamily()->removeParticles(local_ids);
329 m_bpe.m_item_family->endUpdate();
330}
331
332/*---------------------------------------------------------------------------*/
333/*---------------------------------------------------------------------------*/
334
335bool AsyncParticleExchanger::
336_waitSomeMessages(ItemGroup item_group, Int32Array* new_particle_local_ids)
337{
338 {
339 Timer::Sentry ts(m_bpe.m_timer);
340 m_bpe.m_message_list->waitMessages(Parallel::WaitSomeNonBlocking);
341 }
342 m_bpe.m_total_time_waiting += m_bpe.m_timer->lastActivationTime();
343
344 // Sauve les communications actuellements traitées car le traitement
345 // peut en ajouter de nouvelles
346 UniqueArray<ISerializeMessage*> current_messages(m_bpe.m_waiting_messages);
347
348 m_bpe.m_waiting_messages.clear();
349
350 Int64UniqueArray items_to_create_unique_id;
351 Int64UniqueArray items_to_create_cells_unique_id;
352 Int32UniqueArray items_to_create_local_id;
353 Int32UniqueArray items_to_create_cells_local_id;
354 bool has_new_particle = false;
355 for (Integer i = 0, is = current_messages.size(); i < is; ++i) {
356 ISerializeMessage* sm = current_messages[i];
357 if (sm->finished()) {
358 if (!sm->isSend()) { //Si le msg est un recv
359 m_bpe._deserializeMessage(sm, items_to_create_unique_id, items_to_create_cells_unique_id,
360 items_to_create_local_id, items_to_create_cells_local_id,
361 item_group, new_particle_local_ids);
362 // Indique qu'on a recu des particules et donc il faudrait dire
363 // que has_local_flying_particle est vra
364 if (!items_to_create_unique_id.empty())
365 has_new_particle = true;
366 }
367 delete sm;
368 }
369 else {
370 m_bpe.m_waiting_messages.add(sm);
371 }
372 }
373 return has_new_particle;
374}
375/*---------------------------------------------------------------------------*/
376/*---------------------------------------------------------------------------*/
377
378ARCANE_REGISTER_CASE_OPTIONS_NOAXL_FACTORY(AsyncParticleExchanger,
380 AsyncParticleExchanger);
381ARCANE_REGISTER_SUB_DOMAIN_FACTORY(AsyncParticleExchanger,
383 AsyncParticleExchanger);
384
385/*---------------------------------------------------------------------------*/
386/*---------------------------------------------------------------------------*/
387
388} // End namespace Arcane::mesh
389
390/*---------------------------------------------------------------------------*/
391/*---------------------------------------------------------------------------*/
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_REGISTER_SUB_DOMAIN_FACTORY(aclass, ainterface, aname)
Enregistre un service de fabrique pour la classe aclass.
#define ARCANE_REGISTER_CASE_OPTIONS_NOAXL_FACTORY(aclass, ainterface, aname)
Enregistre un service de fabrique pour la classe aclass.
Integer size() const
Nombre d'éléments du vecteur.
bool empty() const
Capacité (nombre d'éléments alloués) du vecteur.
Vue modifiable d'un tableau d'un type T.
void clear()
Supprime les éléments du tableau.
Vue constante d'un tableau de type T.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
Interface d'un échangeur de particules asynchrone.
Interface d'une famille d'entités.
Definition IItemFamily.h:84
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual Int32 commRank() const =0
Rang de cette instance dans le communicateur.
virtual MessageId probe(const PointToPointMessageInfo &message)=0
Sonde si des messages sont disponibles.
virtual IParallelNonBlockingCollective * nonBlockingCollective() const =0
Interface des opérations collectives non blocantes.
virtual UniqueArray< Integer > testSomeRequests(ArrayView< Request > rvalues)=0
Test si une des requêtes rvalues est terminée.
virtual bool isParallel() const =0
Retourne true si l'exécution est parallèle.
Interface des opérations parallèles collectives non bloquantes.
virtual Request allReduce(eReduceType rt, ConstArrayView< char > send_buf, ArrayView< char > recv_buf)=0
Effectue la réduction de type rt sur le tableau send_buf et stoque le résultat dans recv_buf.
Interface d'un échangeur de particules.
Groupe d'entités de maillage.
Definition ItemGroup.h:49
Interface d'un message de sérialisation entre IMessagePassingMng.
virtual bool finished() const =0
true si le message est terminé
virtual bool isSend() const =0
true s'il faut envoyer, false s'il faut recevoir
Informations pour envoyer/recevoir un message point à point.
Exception lorsqu'une opération n'est pas supportée.
Message utilisant un SerializeBuffer.
Positionne la phase de l'action en cours d'exécution.
Definition Timer.h:128
Sentinelle pour le timer. La sentinelle associée à un timer permet de déclancher celui-ci au moment d...
Definition Timer.h:89
TraceMessage info() const
Flot pour un message d'information.
Vecteur 1D de données avec sémantique par valeur (style STL).
@ ReduceSum
Somme des valeurs.
UniqueArray< Int64 > Int64UniqueArray
Tableau dynamique à une dimension d'entiers 64 bits.
Definition UtilsTypes.h:426
Int32 Integer
Type représentant un entier.
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:569
UniqueArray< Int32 > Int32UniqueArray
Tableau dynamique à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:428
Array< Int32 > Int32Array
Tableau dynamique à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:214