Arcane  v3.15.0.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
AsyncParticleExchanger.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2022 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* AsyncParticleExchanger.cc (C) 2000-2021 */
9/* */
10/* Echangeur de particules asynchrone. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/mesh/AsyncParticleExchanger.h"
15#include "arcane/IParallelNonBlockingCollective.h"
16
17/*---------------------------------------------------------------------------*/
18/*---------------------------------------------------------------------------*/
19
20namespace Arcane::mesh
21{
22
23/*---------------------------------------------------------------------------*/
24/*---------------------------------------------------------------------------*/
25
26AsyncParticleExchanger::
27AsyncParticleExchanger(const ServiceBuildInfo& sbi)
28: BasicService(sbi)
29, m_bpe(sbi)
30{
31}
32
33/*---------------------------------------------------------------------------*/
34/*---------------------------------------------------------------------------*/
35
36AsyncParticleExchanger::
37~AsyncParticleExchanger()
38{
39}
40
41/*---------------------------------------------------------------------------*/
42/*---------------------------------------------------------------------------*/
43
44/*---------------------------------------------------------------------------*/
45/*---------------------------------------------------------------------------*/
46
47void AsyncParticleExchanger::
48build()
49{
50 m_bpe.build();
51 // Par défaut met à 0 le niveau de verbosité pour éviter trop de messages
52 // lors des phases asynchrones.
53 m_bpe.setVerboseLevel(0);
54}
55
56/*---------------------------------------------------------------------------*/
57/*---------------------------------------------------------------------------*/
58
59void AsyncParticleExchanger::
60initialize(IItemFamily* item_family)
61{
62 m_bpe.initialize(item_family);
63 IParallelMng* pm = m_bpe.m_parallel_mng;
64 if (pm->isParallel()){
66 if (!pnbc)
68 "AsyncParticleExchanger is not supported because NonBlocking"
69 " collectives are not available");
70 }
71}
72
73/*---------------------------------------------------------------------------*/
74/*---------------------------------------------------------------------------*/
75
76void AsyncParticleExchanger::
77beginNewExchange(Integer nb_particule)
78{
79 info() << "AsyncParticleExchanger is used. It also use BasicParticleExchanger functionnalities";
80 m_bpe.beginNewExchange(nb_particule);
81
82 m_nb_particle_send_before_reduction = 0;
83 m_nb_particle_send_before_reduction_tmp = 0;
84 m_sum_of_nb_particle_sent = 1;
85}
86
87/*---------------------------------------------------------------------------*/
88/*---------------------------------------------------------------------------*/
89
90bool AsyncParticleExchanger::
91exchangeItems(Integer nb_particle_finish_exchange,
95 IFunctor* functor)
96{
97 return m_bpe.exchangeItems(nb_particle_finish_exchange, local_ids,
99}
100
101/*---------------------------------------------------------------------------*/
102/*---------------------------------------------------------------------------*/
103
104bool AsyncParticleExchanger::
105exchangeItems(Integer nb_particle_finish_exchange,
109 IFunctor* functor)
110{
111 return m_bpe.exchangeItems(nb_particle_finish_exchange, local_ids,
113}
114
115/*---------------------------------------------------------------------------*/
116/*---------------------------------------------------------------------------*/
117
118void AsyncParticleExchanger::
119sendItems(Integer nb_particle_finish_exchange,
122{
124}
125
126/*---------------------------------------------------------------------------*/
127/*---------------------------------------------------------------------------*/
128
129bool AsyncParticleExchanger::
130waitMessages(Integer nb_pending_particles,
132 IFunctor* functor)
133{
134 return m_bpe.waitMessages(nb_pending_particles, new_particle_local_ids, functor);
135}
136
137/*---------------------------------------------------------------------------*/
138/*---------------------------------------------------------------------------*/
139
140void AsyncParticleExchanger::
141addNewParticles(Integer nb_particle)
142{
143 m_bpe.addNewParticles(nb_particle);
144}
145
146/*---------------------------------------------------------------------------*/
147/*---------------------------------------------------------------------------*/
148
149IItemFamily* AsyncParticleExchanger::
150itemFamily()
151{
152 return m_bpe.itemFamily();
153}
154
155/*---------------------------------------------------------------------------*/
156/*---------------------------------------------------------------------------*/
157
158void AsyncParticleExchanger::
159setVerboseLevel(Integer level)
160{
161 m_bpe.setVerboseLevel(level);
162}
163
164/*---------------------------------------------------------------------------*/
165/*---------------------------------------------------------------------------*/
166
167Integer AsyncParticleExchanger::
168verboseLevel() const
169{
170 return m_bpe.verboseLevel();
171}
172
173/*---------------------------------------------------------------------------*/
174/*---------------------------------------------------------------------------*/
175
176IAsyncParticleExchanger* AsyncParticleExchanger::
177asyncParticleExchanger()
178{
179 return this;
180}
181
182/*---------------------------------------------------------------------------*/
183/*---------------------------------------------------------------------------*/
184
185bool AsyncParticleExchanger::
186exchangeItemsAsync(Integer nb_particle_finish_exchange,
190 IFunctor* functor,
192{
193 ARCANE_UNUSED(nb_particle_finish_exchange);
194 ARCANE_UNUSED(functor);
195
196 bool is_finished = false;
197 ++m_bpe.m_nb_loop;
198
199 //Génère tous les Isend et les Imrecv matchés avec Improbe
200 m_bpe.m_nb_particle_send = local_ids.size();
201 {
202 Timer::Sentry ts(m_bpe.m_timer);
203 _generateSendItemsAsync(local_ids, sub_domains_to_send);
204 }
205 if (m_bpe.m_verbose_level>=1)
206 info() << "ASE_BeginLoop loop=" << m_bpe.m_nb_loop;
207 m_bpe._sendPendingMessages();
208
210 new_particle_local_ids->clear();
211
212 bool has_new_particle = _waitSomeMessages(ItemGroup(), new_particle_local_ids);
215
216 //----------------------------------------
217 //Ici on a le coeur de l'algo de condition d'arrêt lors de l'utilisation de AsyncParticleExchanger
218 //
219 //Si taille de chunk == 0 && pas de req(red) en vol
220 //Si (Q > 0) avec Q le nombre de particule en vol (résultat du Iallreduce)
221 // Iallreduce (P, Q, req(red));
222 // P=0; avec P le nombre de particule envoyé depuis le dernier Iallreduce
223 //Sinon
224 // retourner is_finished = true
225 //
226
227 IParallelMng* pm = m_bpe.m_parallel_mng;
229
230 //Si la requête a matché, on clear le tableau de requête
231 if (isIallReduceRunning.size() != 0){
232 m_reduce_requests.clear();
233 if (m_bpe.m_verbose_level>=1)
234 info() << "PSM_IAllReduceFinished loop=" << m_bpe.m_nb_loop
235 << " total=" << m_sum_of_nb_particle_sent;
236 }
237
238 //Ici, on teste si on a des particules à traiter en local
239 //Qu'il n'y a pas de requête Iallreduce en vol
240 //et qu'il n'y a pas de requête à envoyer ou recevoir en vol
241 if ((!has_local_flying_particles) && (m_reduce_requests.size() == 0) && (m_bpe.m_waiting_messages.size() == 0) && (m_bpe.m_pending_messages.size()==0)) {
242 if (m_sum_of_nb_particle_sent > 0) {
243 //Faire MPI_Iallreduce
244 IParallelNonBlockingCollective* pnbc = pm->nonBlockingCollective();
245 m_nb_particle_send_before_reduction = m_nb_particle_send_before_reduction_tmp;
246 if (m_bpe.m_verbose_level>=1)
247 info() << "PSM_DoIAllReduce loop=" << m_bpe.m_nb_loop
248 << " n=" << m_nb_particle_send_before_reduction
249 << " nb_to_send=" << local_ids.size();
250 m_reduce_requests.add(pnbc->allReduce(Parallel::ReduceSum,
251 ConstArrayView<Integer>(1, &m_nb_particle_send_before_reduction),
252 ArrayView<Integer>(1, &m_sum_of_nb_particle_sent)));
253 m_nb_particle_send_before_reduction_tmp = 0;
254 }
255 else {
256 is_finished = true; // is_finished = true, il n'y a plus de particules à traiter globalement
257 }
258 }
259 return is_finished;
260}
261
262/*---------------------------------------------------------------------------*/
263/*---------------------------------------------------------------------------*/
264
265void AsyncParticleExchanger::
266_generateSendItemsAsync(Int32ConstArrayView local_ids, Int32ConstArrayView sub_domains_to_send)
267{
268 Timer::Phase tphase(subDomain(), TP_Communication);
269
270 IMesh* mesh = m_bpe.m_item_family->mesh();
271
272 Int32UniqueArray communicating_sub_domains;
273 mesh->cellFamily()->getCommunicatingSubDomains(communicating_sub_domains);
274
275 Integer nb_connected_sub_domain = communicating_sub_domains.size();
276
277 UniqueArray<SharedArray<Int32>> ids_to_send(nb_connected_sub_domain);
278 // Infos pour chaque sous-domaine connecté
279 m_bpe.m_accumulate_infos.clear();
280 m_bpe.m_accumulate_infos.resize(nb_connected_sub_domain);
281
282 m_bpe._addItemsToSend(local_ids, sub_domains_to_send, communicating_sub_domains, ids_to_send);
283
284 Int64UniqueArray items_to_send_uid;
285 Int64UniqueArray items_to_send_cells_uid; // Uniquement pour les particules;
286
287 IParallelMng* pm = m_bpe.m_parallel_mng;
288
289 //-------------------------------
290 // Gestion des envoies de particules
291 //
292 // [HT] En mode asynchrone, nous devons envoyer que si nous avons des particules
293 // et les réceptions se feront avec des MPI_Improbe
294 for (Integer j = 0; j < nb_connected_sub_domain; ++j) {
295 if (ids_to_send[j].size() != 0) {
296 auto* sm = new SerializeMessage(pm->commRank(), communicating_sub_domains[j],
297 ISerializeMessage::MT_Send);
298 m_bpe.m_accumulate_infos[j] = sm;
299 m_bpe._serializeMessage(sm, ids_to_send[j], items_to_send_uid, items_to_send_cells_uid);
300 m_bpe.m_pending_messages.add(sm);
301 m_nb_particle_send_before_reduction_tmp += ids_to_send[j].size();
302 }
303 }
304
305 //-------------------------------
306 // Gestion des réceptions de particules
307 //
308 // [HT] En mode asynchrone, les réceptions se font avec des MPI_Improbe et MPI_Imrecv
309 for (Integer j = 0; j < nb_connected_sub_domain; ++j) {
310
311 MessageTag tag(BasicSerializeMessage::DEFAULT_SERIALIZE_TAG_VALUE);
312 MessageRank rank(communicating_sub_domains[j]);
313 PointToPointMessageInfo message(rank,tag);
314 message.setBlocking(false);
315 MessageId mid = pm->probe(message);
316
317 if (mid.isValid()) {
318 SerializeMessage* recv_sm = new SerializeMessage(m_bpe.subDomain()->subDomainId(), mid);
319 m_bpe.m_pending_messages.add(recv_sm);
320 }
321 }
322
323 m_bpe.m_accumulate_infos.clear();
324 // Détruit les entités qui viennent d'être envoyées
325 m_bpe.m_item_family->toParticleFamily()->removeParticles(local_ids);
326 m_bpe.m_item_family->endUpdate();
327}
328
329/*---------------------------------------------------------------------------*/
330/*---------------------------------------------------------------------------*/
331
332bool AsyncParticleExchanger::
333_waitSomeMessages(ItemGroup item_group, Int32Array* new_particle_local_ids)
334{
335 {
336 Timer::Sentry ts(m_bpe.m_timer);
337 m_bpe.m_message_list->waitMessages(Parallel::WaitSomeNonBlocking);
338 }
339 m_bpe.m_total_time_waiting += m_bpe.m_timer->lastActivationTime();
340
341 // Sauve les communications actuellements traitées car le traitement
342 // peut en ajouter de nouvelles
343 UniqueArray<ISerializeMessage*> current_messages(m_bpe.m_waiting_messages);
344
345 m_bpe.m_waiting_messages.clear();
346
347 Int64UniqueArray items_to_create_unique_id;
348 Int64UniqueArray items_to_create_cells_unique_id;
349 Int32UniqueArray items_to_create_local_id;
350 Int32UniqueArray items_to_create_cells_local_id;
351 bool has_new_particle = false;
352 for (Integer i = 0, is = current_messages.size(); i < is; ++i) {
353 ISerializeMessage* sm = current_messages[i];
354 if (sm->finished()) {
355 if (!sm->isSend()) { //Si le msg est un recv
356 m_bpe._deserializeMessage(sm, items_to_create_unique_id, items_to_create_cells_unique_id,
357 items_to_create_local_id, items_to_create_cells_local_id,
358 item_group, new_particle_local_ids);
359 // Indique qu'on a recu des particules et donc il faudrait dire
360 // que has_local_flying_particle est vra
361 if (!items_to_create_unique_id.empty())
362 has_new_particle = true;
363 }
364 delete sm;
365 }
366 else {
367 m_bpe.m_waiting_messages.add(sm);
368 }
369 }
370 return has_new_particle;
371}
372/*---------------------------------------------------------------------------*/
373/*---------------------------------------------------------------------------*/
374
375ARCANE_REGISTER_CASE_OPTIONS_NOAXL_FACTORY(AsyncParticleExchanger,
376 IParticleExchanger,
377 AsyncParticleExchanger);
378ARCANE_REGISTER_SUB_DOMAIN_FACTORY(AsyncParticleExchanger,
379 IParticleExchanger,
380 AsyncParticleExchanger);
381
382/*---------------------------------------------------------------------------*/
383/*---------------------------------------------------------------------------*/
384
385} // End namespace Arcane::mesh
386
387/*---------------------------------------------------------------------------*/
388/*---------------------------------------------------------------------------*/
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_REGISTER_SUB_DOMAIN_FACTORY(aclass, ainterface, aname)
Enregistre un service de fabrique pour la classe aclass.
#define ARCANE_REGISTER_CASE_OPTIONS_NOAXL_FACTORY(aclass, ainterface, aname)
Enregistre un service de fabrique pour la classe aclass.
Tableau d'items de types quelconques.
Interface d'un échangeur de particules asynchrone.
Interface d'une famille d'entités.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual Int32 commRank() const =0
Rang de cette instance dans le communicateur.
virtual MessageId probe(const PointToPointMessageInfo &message)=0
Sonde si des messages sont disponibles.
virtual IParallelNonBlockingCollective * nonBlockingCollective() const =0
Interface des opérations collectives non blocantes.
virtual UniqueArray< Integer > testSomeRequests(ArrayView< Request > rvalues)=0
Test si une des requêtes rvalues est terminée.
virtual bool isParallel() const =0
Retourne true si l'exécution est parallèle.
Interface des opérations parallèles collectives non bloquantes.
Groupe d'entités de maillage.
Definition ItemGroup.h:49
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Definition Lima.cc:149
Sentinelle pour le timer. La sentinelle associée à un timer permet de déclancher celui-ci au moment d...
Definition Timer.h:89
Vue constante d'un tableau de type T.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
Exception lorsqu'une opération n'est pas supportée.
UniqueArray< Int64 > Int64UniqueArray
Tableau dynamique à une dimension d'entiers 64 bits.
Definition UtilsTypes.h:550
UniqueArray< Int32 > Int32UniqueArray
Tableau dynamique à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:552
Int32 Integer
Type représentant un entier.