14#include "arcane/mesh/AsyncParticleExchanger.h"
15#include "arcane/core/IParallelNonBlockingCollective.h"
17#include "arcane/core/internal/SerializeMessage.h"
24using namespace Arcane::MessagePassing;
29AsyncParticleExchanger::
30AsyncParticleExchanger(
const ServiceBuildInfo& sbi)
39AsyncParticleExchanger::
40~AsyncParticleExchanger()
50void AsyncParticleExchanger::
56 m_bpe.setVerboseLevel(0);
62void AsyncParticleExchanger::
65 m_bpe.initialize(item_family);
71 "AsyncParticleExchanger is not supported because NonBlocking"
72 " collectives are not available");
79void AsyncParticleExchanger::
80beginNewExchange(
Integer nb_particule)
82 info() <<
"AsyncParticleExchanger is used. It also use BasicParticleExchanger functionnalities";
83 m_bpe.beginNewExchange(nb_particule);
85 m_nb_particle_send_before_reduction = 0;
86 m_nb_particle_send_before_reduction_tmp = 0;
87 m_sum_of_nb_particle_sent = 1;
93bool AsyncParticleExchanger::
94exchangeItems(
Integer nb_particle_finish_exchange,
100 return m_bpe.exchangeItems(nb_particle_finish_exchange, local_ids,
101 sub_domains_to_send, item_group, functor);
107bool AsyncParticleExchanger::
108exchangeItems(
Integer nb_particle_finish_exchange,
114 return m_bpe.exchangeItems(nb_particle_finish_exchange, local_ids,
115 sub_domains_to_send, new_particle_local_ids, functor);
121void AsyncParticleExchanger::
122sendItems(
Integer nb_particle_finish_exchange,
126 m_bpe.sendItems(nb_particle_finish_exchange, local_ids, sub_domains_to_send);
132bool AsyncParticleExchanger::
133waitMessages(
Integer nb_pending_particles,
137 return m_bpe.waitMessages(nb_pending_particles, new_particle_local_ids, functor);
143void AsyncParticleExchanger::
144addNewParticles(
Integer nb_particle)
146 m_bpe.addNewParticles(nb_particle);
155 return m_bpe.itemFamily();
161void AsyncParticleExchanger::
164 m_bpe.setVerboseLevel(level);
173 return m_bpe.verboseLevel();
180asyncParticleExchanger()
188bool AsyncParticleExchanger::
189exchangeItemsAsync(
Integer nb_particle_finish_exchange,
194 bool has_local_flying_particles)
196 ARCANE_UNUSED(nb_particle_finish_exchange);
197 ARCANE_UNUSED(functor);
199 bool is_finished =
false;
203 m_bpe.m_nb_particle_send = local_ids.
size();
206 _generateSendItemsAsync(local_ids, sub_domains_to_send);
208 if (m_bpe.m_verbose_level>=1)
209 info() <<
"ASE_BeginLoop loop=" << m_bpe.m_nb_loop;
210 m_bpe._sendPendingMessages();
212 if (new_particle_local_ids)
213 new_particle_local_ids->
clear();
215 bool has_new_particle = _waitSomeMessages(
ItemGroup(), new_particle_local_ids);
216 if (has_new_particle)
217 has_local_flying_particles =
true;
234 if (isIallReduceRunning.
size() != 0){
235 m_reduce_requests.clear();
236 if (m_bpe.m_verbose_level>=1)
237 info() <<
"PSM_IAllReduceFinished loop=" << m_bpe.m_nb_loop
238 <<
" total=" << m_sum_of_nb_particle_sent;
244 if ((!has_local_flying_particles) && (m_reduce_requests.size() == 0) && (m_bpe.m_waiting_messages.size() == 0) && (m_bpe.m_pending_messages.size()==0)) {
245 if (m_sum_of_nb_particle_sent > 0) {
248 m_nb_particle_send_before_reduction = m_nb_particle_send_before_reduction_tmp;
249 if (m_bpe.m_verbose_level>=1)
250 info() <<
"PSM_DoIAllReduce loop=" << m_bpe.m_nb_loop
251 <<
" n=" << m_nb_particle_send_before_reduction
252 <<
" nb_to_send=" << local_ids.
size();
256 m_nb_particle_send_before_reduction_tmp = 0;
268void AsyncParticleExchanger::
273 IMesh* mesh = m_bpe.m_item_family->mesh();
276 mesh->cellFamily()->getCommunicatingSubDomains(communicating_sub_domains);
278 Integer nb_connected_sub_domain = communicating_sub_domains.
size();
282 m_bpe.m_accumulate_infos.clear();
283 m_bpe.m_accumulate_infos.resize(nb_connected_sub_domain);
285 m_bpe._addItemsToSend(local_ids, sub_domains_to_send, communicating_sub_domains, ids_to_send);
297 for (
Integer j = 0; j < nb_connected_sub_domain; ++j) {
298 if (ids_to_send[j].size() != 0) {
300 ISerializeMessage::MT_Send);
301 m_bpe.m_accumulate_infos[j] = sm;
302 m_bpe._serializeMessage(sm, ids_to_send[j], items_to_send_uid, items_to_send_cells_uid);
303 m_bpe.m_pending_messages.add(sm);
304 m_nb_particle_send_before_reduction_tmp += ids_to_send[j].size();
312 for (
Integer j = 0; j < nb_connected_sub_domain; ++j) {
314 MessageTag tag(Arcane::MessagePassing::internal::BasicSerializeMessage::DEFAULT_SERIALIZE_TAG_VALUE);
317 message.setBlocking(
false);
322 m_bpe.m_pending_messages.add(recv_sm);
326 m_bpe.m_accumulate_infos.clear();
328 m_bpe.m_item_family->toParticleFamily()->removeParticles(local_ids);
329 m_bpe.m_item_family->endUpdate();
335bool AsyncParticleExchanger::
342 m_bpe.m_total_time_waiting += m_bpe.m_timer->lastActivationTime();
348 m_bpe.m_waiting_messages.clear();
354 bool has_new_particle =
false;
355 for (
Integer i = 0, is = current_messages.size(); i < is; ++i) {
359 m_bpe._deserializeMessage(sm, items_to_create_unique_id, items_to_create_cells_unique_id,
360 items_to_create_local_id, items_to_create_cells_local_id,
361 item_group, new_particle_local_ids);
364 if (!items_to_create_unique_id.
empty())
365 has_new_particle =
true;
370 m_bpe.m_waiting_messages.add(sm);
373 return has_new_particle;
380 AsyncParticleExchanger);
383 AsyncParticleExchanger);
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_REGISTER_SUB_DOMAIN_FACTORY(aclass, ainterface, aname)
Enregistre un service de fabrique pour la classe aclass.
#define ARCANE_REGISTER_CASE_OPTIONS_NOAXL_FACTORY(aclass, ainterface, aname)
Enregistre un service de fabrique pour la classe aclass.
Integer size() const
Nombre d'éléments du vecteur.
bool empty() const
Capacité (nombre d'éléments alloués) du vecteur.
Vue modifiable d'un tableau d'un type T.
void clear()
Supprime les éléments du tableau.
Vue constante d'un tableau de type T.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
Interface d'un échangeur de particules asynchrone.
Interface d'une famille d'entités.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual Int32 commRank() const =0
Rang de cette instance dans le communicateur.
virtual MessageId probe(const PointToPointMessageInfo &message)=0
Sonde si des messages sont disponibles.
virtual IParallelNonBlockingCollective * nonBlockingCollective() const =0
Interface des opérations collectives non blocantes.
virtual UniqueArray< Integer > testSomeRequests(ArrayView< Request > rvalues)=0
Test si une des requêtes rvalues est terminée.
virtual bool isParallel() const =0
Retourne true si l'exécution est parallèle.
Interface des opérations parallèles collectives non bloquantes.
virtual Request allReduce(eReduceType rt, ConstArrayView< char > send_buf, ArrayView< char > recv_buf)=0
Effectue la réduction de type rt sur le tableau send_buf et stoque le résultat dans recv_buf.
Interface d'un échangeur de particules.
Groupe d'entités de maillage.
Interface d'un message de sérialisation entre IMessagePassingMng.
virtual bool finished() const =0
true si le message est terminé
virtual bool isSend() const =0
true s'il faut envoyer, false s'il faut recevoir
Informations pour envoyer/recevoir un message point à point.
Exception lorsqu'une opération n'est pas supportée.
Message utilisant un SerializeBuffer.
Positionne la phase de l'action en cours d'exécution.
Sentinelle pour le timer. La sentinelle associée à un timer permet de déclancher celui-ci au moment d...
TraceMessage info() const
Flot pour un message d'information.
Vecteur 1D de données avec sémantique par valeur (style STL).
@ ReduceSum
Somme des valeurs.
UniqueArray< Int64 > Int64UniqueArray
Tableau dynamique à une dimension d'entiers 64 bits.
Int32 Integer
Type représentant un entier.
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
UniqueArray< Int32 > Int32UniqueArray
Tableau dynamique à une dimension d'entiers 32 bits.
Array< Int32 > Int32Array
Tableau dynamique à une dimension d'entiers 32 bits.