Arcane  v3.15.3.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
AsyncParticleExchanger.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* AsyncParticleExchanger.cc (C) 2000-2025 */
9/* */
10/* Echangeur de particules asynchrone. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/mesh/AsyncParticleExchanger.h"
15#include "arcane/core/IParallelNonBlockingCollective.h"
16
17#include "arccore/message_passing/BasicSerializeMessage.h"
18
19/*---------------------------------------------------------------------------*/
20/*---------------------------------------------------------------------------*/
21
22namespace Arcane::mesh
23{
24using namespace Arcane::MessagePassing;
25
26/*---------------------------------------------------------------------------*/
27/*---------------------------------------------------------------------------*/
28
29AsyncParticleExchanger::
30AsyncParticleExchanger(const ServiceBuildInfo& sbi)
32, m_bpe(sbi)
33{
34}
35
36/*---------------------------------------------------------------------------*/
37/*---------------------------------------------------------------------------*/
38
39AsyncParticleExchanger::
40~AsyncParticleExchanger()
41{
42}
43
44/*---------------------------------------------------------------------------*/
45/*---------------------------------------------------------------------------*/
46
47/*---------------------------------------------------------------------------*/
48/*---------------------------------------------------------------------------*/
49
50void AsyncParticleExchanger::
51build()
52{
53 m_bpe.build();
54 // Par défaut met à 0 le niveau de verbosité pour éviter trop de messages
55 // lors des phases asynchrones.
56 m_bpe.setVerboseLevel(0);
57}
58
59/*---------------------------------------------------------------------------*/
60/*---------------------------------------------------------------------------*/
61
62void AsyncParticleExchanger::
63initialize(IItemFamily* item_family)
64{
65 m_bpe.initialize(item_family);
66 IParallelMng* pm = m_bpe.m_parallel_mng;
67 if (pm->isParallel()){
69 if (!pnbc)
71 "AsyncParticleExchanger is not supported because NonBlocking"
72 " collectives are not available");
73 }
74}
75
76/*---------------------------------------------------------------------------*/
77/*---------------------------------------------------------------------------*/
78
79void AsyncParticleExchanger::
80beginNewExchange(Integer nb_particule)
81{
82 info() << "AsyncParticleExchanger is used. It also use BasicParticleExchanger functionnalities";
83 m_bpe.beginNewExchange(nb_particule);
84
85 m_nb_particle_send_before_reduction = 0;
86 m_nb_particle_send_before_reduction_tmp = 0;
87 m_sum_of_nb_particle_sent = 1;
88}
89
90/*---------------------------------------------------------------------------*/
91/*---------------------------------------------------------------------------*/
92
93bool AsyncParticleExchanger::
94exchangeItems(Integer nb_particle_finish_exchange,
98 IFunctor* functor)
99{
100 return m_bpe.exchangeItems(nb_particle_finish_exchange, local_ids,
102}
103
104/*---------------------------------------------------------------------------*/
105/*---------------------------------------------------------------------------*/
106
107bool AsyncParticleExchanger::
108exchangeItems(Integer nb_particle_finish_exchange,
112 IFunctor* functor)
113{
114 return m_bpe.exchangeItems(nb_particle_finish_exchange, local_ids,
116}
117
118/*---------------------------------------------------------------------------*/
119/*---------------------------------------------------------------------------*/
120
121void AsyncParticleExchanger::
122sendItems(Integer nb_particle_finish_exchange,
125{
127}
128
129/*---------------------------------------------------------------------------*/
130/*---------------------------------------------------------------------------*/
131
132bool AsyncParticleExchanger::
133waitMessages(Integer nb_pending_particles,
135 IFunctor* functor)
136{
137 return m_bpe.waitMessages(nb_pending_particles, new_particle_local_ids, functor);
138}
139
140/*---------------------------------------------------------------------------*/
141/*---------------------------------------------------------------------------*/
142
143void AsyncParticleExchanger::
144addNewParticles(Integer nb_particle)
145{
146 m_bpe.addNewParticles(nb_particle);
147}
148
149/*---------------------------------------------------------------------------*/
150/*---------------------------------------------------------------------------*/
151
152IItemFamily* AsyncParticleExchanger::
153itemFamily()
154{
155 return m_bpe.itemFamily();
156}
157
158/*---------------------------------------------------------------------------*/
159/*---------------------------------------------------------------------------*/
160
161void AsyncParticleExchanger::
162setVerboseLevel(Integer level)
163{
164 m_bpe.setVerboseLevel(level);
165}
166
167/*---------------------------------------------------------------------------*/
168/*---------------------------------------------------------------------------*/
169
170Integer AsyncParticleExchanger::
171verboseLevel() const
172{
173 return m_bpe.verboseLevel();
174}
175
176/*---------------------------------------------------------------------------*/
177/*---------------------------------------------------------------------------*/
178
179IAsyncParticleExchanger* AsyncParticleExchanger::
180asyncParticleExchanger()
181{
182 return this;
183}
184
185/*---------------------------------------------------------------------------*/
186/*---------------------------------------------------------------------------*/
187
188bool AsyncParticleExchanger::
189exchangeItemsAsync(Integer nb_particle_finish_exchange,
193 IFunctor* functor,
195{
196 ARCANE_UNUSED(nb_particle_finish_exchange);
197 ARCANE_UNUSED(functor);
198
199 bool is_finished = false;
200 ++m_bpe.m_nb_loop;
201
202 //Génère tous les Isend et les Imrecv matchés avec Improbe
203 m_bpe.m_nb_particle_send = local_ids.size();
204 {
205 Timer::Sentry ts(m_bpe.m_timer);
206 _generateSendItemsAsync(local_ids, sub_domains_to_send);
207 }
208 if (m_bpe.m_verbose_level>=1)
209 info() << "ASE_BeginLoop loop=" << m_bpe.m_nb_loop;
210 m_bpe._sendPendingMessages();
211
213 new_particle_local_ids->clear();
214
215 bool has_new_particle = _waitSomeMessages(ItemGroup(), new_particle_local_ids);
218
219 //----------------------------------------
220 //Ici on a le coeur de l'algo de condition d'arrêt lors de l'utilisation de AsyncParticleExchanger
221 //
222 //Si taille de chunk == 0 && pas de req(red) en vol
223 //Si (Q > 0) avec Q le nombre de particule en vol (résultat du Iallreduce)
224 // Iallreduce (P, Q, req(red));
225 // P=0; avec P le nombre de particule envoyé depuis le dernier Iallreduce
226 //Sinon
227 // retourner is_finished = true
228 //
229
230 IParallelMng* pm = m_bpe.m_parallel_mng;
232
233 //Si la requête a matché, on clear le tableau de requête
234 if (isIallReduceRunning.size() != 0){
235 m_reduce_requests.clear();
236 if (m_bpe.m_verbose_level>=1)
237 info() << "PSM_IAllReduceFinished loop=" << m_bpe.m_nb_loop
238 << " total=" << m_sum_of_nb_particle_sent;
239 }
240
241 //Ici, on teste si on a des particules à traiter en local
242 //Qu'il n'y a pas de requête Iallreduce en vol
243 //et qu'il n'y a pas de requête à envoyer ou recevoir en vol
244 if ((!has_local_flying_particles) && (m_reduce_requests.size() == 0) && (m_bpe.m_waiting_messages.size() == 0) && (m_bpe.m_pending_messages.size()==0)) {
245 if (m_sum_of_nb_particle_sent > 0) {
246 //Faire MPI_Iallreduce
248 m_nb_particle_send_before_reduction = m_nb_particle_send_before_reduction_tmp;
249 if (m_bpe.m_verbose_level>=1)
250 info() << "PSM_DoIAllReduce loop=" << m_bpe.m_nb_loop
251 << " n=" << m_nb_particle_send_before_reduction
252 << " nb_to_send=" << local_ids.size();
253 m_reduce_requests.add(pnbc->allReduce(Parallel::ReduceSum,
254 ConstArrayView<Integer>(1, &m_nb_particle_send_before_reduction),
255 ArrayView<Integer>(1, &m_sum_of_nb_particle_sent)));
256 m_nb_particle_send_before_reduction_tmp = 0;
257 }
258 else {
259 is_finished = true; // is_finished = true, il n'y a plus de particules à traiter globalement
260 }
261 }
262 return is_finished;
263}
264
265/*---------------------------------------------------------------------------*/
266/*---------------------------------------------------------------------------*/
267
268void AsyncParticleExchanger::
270{
271 Timer::Phase tphase(subDomain(), TP_Communication);
272
273 IMesh* mesh = m_bpe.m_item_family->mesh();
274
276 mesh->cellFamily()->getCommunicatingSubDomains(communicating_sub_domains);
277
279
281 // Infos pour chaque sous-domaine connecté
282 m_bpe.m_accumulate_infos.clear();
283 m_bpe.m_accumulate_infos.resize(nb_connected_sub_domain);
284
286
288 Int64UniqueArray items_to_send_cells_uid; // Uniquement pour les particules;
289
290 IParallelMng* pm = m_bpe.m_parallel_mng;
291
292 //-------------------------------
293 // Gestion des envoies de particules
294 //
295 // [HT] En mode asynchrone, nous devons envoyer que si nous avons des particules
296 // et les réceptions se feront avec des MPI_Improbe
297 for (Integer j = 0; j < nb_connected_sub_domain; ++j) {
298 if (ids_to_send[j].size() != 0) {
300 ISerializeMessage::MT_Send);
301 m_bpe.m_accumulate_infos[j] = sm;
302 m_bpe._serializeMessage(sm, ids_to_send[j], items_to_send_uid, items_to_send_cells_uid);
303 m_bpe.m_pending_messages.add(sm);
304 m_nb_particle_send_before_reduction_tmp += ids_to_send[j].size();
305 }
306 }
307
308 //-------------------------------
309 // Gestion des réceptions de particules
310 //
311 // [HT] En mode asynchrone, les réceptions se font avec des MPI_Improbe et MPI_Imrecv
312 for (Integer j = 0; j < nb_connected_sub_domain; ++j) {
313
314 MessageTag tag(Arcane::MessagePassing::internal::BasicSerializeMessage::DEFAULT_SERIALIZE_TAG_VALUE);
316 PointToPointMessageInfo message(rank,tag);
317 message.setBlocking(false);
318 MessageId mid = pm->probe(message);
319
320 if (mid.isValid()) {
321 SerializeMessage* recv_sm = new SerializeMessage(m_bpe.subDomain()->subDomainId(), mid);
322 m_bpe.m_pending_messages.add(recv_sm);
323 }
324 }
325
326 m_bpe.m_accumulate_infos.clear();
327 // Détruit les entités qui viennent d'être envoyées
328 m_bpe.m_item_family->toParticleFamily()->removeParticles(local_ids);
329 m_bpe.m_item_family->endUpdate();
330}
331
332/*---------------------------------------------------------------------------*/
333/*---------------------------------------------------------------------------*/
334
335bool AsyncParticleExchanger::
337{
338 {
339 Timer::Sentry ts(m_bpe.m_timer);
340 m_bpe.m_message_list->waitMessages(Parallel::WaitSomeNonBlocking);
341 }
342 m_bpe.m_total_time_waiting += m_bpe.m_timer->lastActivationTime();
343
344 // Sauve les communications actuellements traitées car le traitement
345 // peut en ajouter de nouvelles
346 UniqueArray<ISerializeMessage*> current_messages(m_bpe.m_waiting_messages);
347
348 m_bpe.m_waiting_messages.clear();
349
354 bool has_new_particle = false;
355 for (Integer i = 0, is = current_messages.size(); i < is; ++i) {
357 if (sm->finished()) {
358 if (!sm->isSend()) { //Si le msg est un recv
362 // Indique qu'on a recu des particules et donc il faudrait dire
363 // que has_local_flying_particle est vra
364 if (!items_to_create_unique_id.empty())
365 has_new_particle = true;
366 }
367 delete sm;
368 }
369 else {
370 m_bpe.m_waiting_messages.add(sm);
371 }
372 }
373 return has_new_particle;
374}
375/*---------------------------------------------------------------------------*/
376/*---------------------------------------------------------------------------*/
377
378ARCANE_REGISTER_CASE_OPTIONS_NOAXL_FACTORY(AsyncParticleExchanger,
380 AsyncParticleExchanger);
381ARCANE_REGISTER_SUB_DOMAIN_FACTORY(AsyncParticleExchanger,
383 AsyncParticleExchanger);
384
385/*---------------------------------------------------------------------------*/
386/*---------------------------------------------------------------------------*/
387
388} // End namespace Arcane::mesh
389
390/*---------------------------------------------------------------------------*/
391/*---------------------------------------------------------------------------*/
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_REGISTER_SUB_DOMAIN_FACTORY(aclass, ainterface, aname)
Enregistre un service de fabrique pour la classe aclass.
#define ARCANE_REGISTER_CASE_OPTIONS_NOAXL_FACTORY(aclass, ainterface, aname)
Enregistre un service de fabrique pour la classe aclass.
virtual void build()
Construction de niveau build du service.
Tableau d'items de types quelconques.
Classe de base de service lié à un sous-domaine.
Interface d'un échangeur de particules asynchrone.
Interface d'une famille d'entités.
virtual IItemFamily * cellFamily()=0
Retourne la famille des mailles.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual Int32 commRank() const =0
Rang de cette instance dans le communicateur.
virtual MessageId probe(const PointToPointMessageInfo &message)=0
Sonde si des messages sont disponibles.
virtual IParallelNonBlockingCollective * nonBlockingCollective() const =0
Interface des opérations collectives non blocantes.
virtual UniqueArray< Integer > testSomeRequests(ArrayView< Request > rvalues)=0
Test si une des requêtes rvalues est terminée.
virtual bool isParallel() const =0
Retourne true si l'exécution est parallèle.
Interface des opérations parallèles collectives non bloquantes.
Interface d'un échangeur de particules.
Groupe d'entités de maillage.
Definition ItemGroup.h:49
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Definition Lima.cc:149
Message utilisant un SerializeBuffer.
Structure contenant les informations pour créer un service.
Positionne la phase de l'action en cours d'exécution.
Definition Timer.h:128
Sentinelle pour le timer. La sentinelle associée à un timer permet de déclancher celui-ci au moment d...
Definition Timer.h:89
Vue constante d'un tableau de type T.
Informations pour envoyer/recevoir un message point à point.
Exception lorsqu'une opération n'est pas supportée.
Vecteur 1D de données avec sémantique par valeur (style STL).
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
Definition Parallel.h:50
Int32 Integer
Type représentant un entier.