14#include "arcane/mesh/AsyncParticleExchanger.h"
15#include "arcane/IParallelNonBlockingCollective.h"
26AsyncParticleExchanger::
27AsyncParticleExchanger(
const ServiceBuildInfo& sbi)
36AsyncParticleExchanger::
37~AsyncParticleExchanger()
47void AsyncParticleExchanger::
53 m_bpe.setVerboseLevel(0);
59void AsyncParticleExchanger::
68 "AsyncParticleExchanger is not supported because NonBlocking"
69 " collectives are not available");
76void AsyncParticleExchanger::
79 info() <<
"AsyncParticleExchanger is used. It also use BasicParticleExchanger functionnalities";
82 m_nb_particle_send_before_reduction = 0;
83 m_nb_particle_send_before_reduction_tmp = 0;
84 m_sum_of_nb_particle_sent = 1;
90bool AsyncParticleExchanger::
104bool AsyncParticleExchanger::
118void AsyncParticleExchanger::
129bool AsyncParticleExchanger::
140void AsyncParticleExchanger::
152 return m_bpe.itemFamily();
158void AsyncParticleExchanger::
159setVerboseLevel(Integer level)
161 m_bpe.setVerboseLevel(level);
167Integer AsyncParticleExchanger::
170 return m_bpe.verboseLevel();
177asyncParticleExchanger()
185bool AsyncParticleExchanger::
194 ARCANE_UNUSED(functor);
200 m_bpe.m_nb_particle_send =
local_ids.size();
205 if (m_bpe.m_verbose_level>=1)
206 info() <<
"ASE_BeginLoop loop=" << m_bpe.m_nb_loop;
207 m_bpe._sendPendingMessages();
232 m_reduce_requests.clear();
233 if (m_bpe.m_verbose_level>=1)
234 info() <<
"PSM_IAllReduceFinished loop=" << m_bpe.m_nb_loop
235 <<
" total=" << m_sum_of_nb_particle_sent;
241 if ((!has_local_flying_particles) && (m_reduce_requests.size() == 0) && (m_bpe.m_waiting_messages.size() == 0) && (m_bpe.m_pending_messages.size()==0)) {
242 if (m_sum_of_nb_particle_sent > 0) {
245 m_nb_particle_send_before_reduction = m_nb_particle_send_before_reduction_tmp;
246 if (m_bpe.m_verbose_level>=1)
247 info() <<
"PSM_DoIAllReduce loop=" << m_bpe.m_nb_loop
248 <<
" n=" << m_nb_particle_send_before_reduction
249 <<
" nb_to_send=" << local_ids.
size();
250 m_reduce_requests.add(pnbc->allReduce(Parallel::ReduceSum,
251 ConstArrayView<Integer>(1, &m_nb_particle_send_before_reduction),
252 ArrayView<Integer>(1, &m_sum_of_nb_particle_sent)));
253 m_nb_particle_send_before_reduction_tmp = 0;
265void AsyncParticleExchanger::
266_generateSendItemsAsync(Int32ConstArrayView local_ids, Int32ConstArrayView sub_domains_to_send)
268 Timer::Phase tphase(subDomain(), TP_Communication);
270 IMesh* mesh = m_bpe.m_item_family->mesh();
273 mesh->cellFamily()->getCommunicatingSubDomains(communicating_sub_domains);
275 Integer nb_connected_sub_domain = communicating_sub_domains.size();
277 UniqueArray<SharedArray<Int32>> ids_to_send(nb_connected_sub_domain);
279 m_bpe.m_accumulate_infos.clear();
280 m_bpe.m_accumulate_infos.resize(nb_connected_sub_domain);
282 m_bpe._addItemsToSend(local_ids, sub_domains_to_send, communicating_sub_domains, ids_to_send);
287 IParallelMng* pm = m_bpe.m_parallel_mng;
294 for (Integer j = 0; j < nb_connected_sub_domain; ++j) {
295 if (ids_to_send[j].size() != 0) {
296 auto* sm =
new SerializeMessage(pm->
commRank(), communicating_sub_domains[j],
297 ISerializeMessage::MT_Send);
298 m_bpe.m_accumulate_infos[j] = sm;
299 m_bpe._serializeMessage(sm, ids_to_send[j], items_to_send_uid, items_to_send_cells_uid);
300 m_bpe.m_pending_messages.add(sm);
301 m_nb_particle_send_before_reduction_tmp += ids_to_send[j].size();
309 for (Integer j = 0; j < nb_connected_sub_domain; ++j) {
311 MessageTag tag(BasicSerializeMessage::DEFAULT_SERIALIZE_TAG_VALUE);
312 MessageRank rank(communicating_sub_domains[j]);
313 PointToPointMessageInfo message(rank,tag);
314 message.setBlocking(
false);
315 MessageId mid = pm->
probe(message);
318 SerializeMessage* recv_sm =
new SerializeMessage(m_bpe.subDomain()->subDomainId(), mid);
319 m_bpe.m_pending_messages.add(recv_sm);
323 m_bpe.m_accumulate_infos.clear();
325 m_bpe.m_item_family->toParticleFamily()->removeParticles(local_ids);
326 m_bpe.m_item_family->endUpdate();
332bool AsyncParticleExchanger::
333_waitSomeMessages(ItemGroup item_group, Int32Array* new_particle_local_ids)
336 Timer::Sentry ts(m_bpe.m_timer);
337 m_bpe.m_message_list->waitMessages(Parallel::WaitSomeNonBlocking);
339 m_bpe.m_total_time_waiting += m_bpe.m_timer->lastActivationTime();
343 UniqueArray<ISerializeMessage*> current_messages(m_bpe.m_waiting_messages);
345 m_bpe.m_waiting_messages.clear();
351 bool has_new_particle =
false;
352 for (Integer i = 0, is = current_messages.size(); i < is; ++i) {
353 ISerializeMessage* sm = current_messages[i];
354 if (sm->finished()) {
356 m_bpe._deserializeMessage(sm, items_to_create_unique_id, items_to_create_cells_unique_id,
357 items_to_create_local_id, items_to_create_cells_local_id,
358 item_group, new_particle_local_ids);
361 if (!items_to_create_unique_id.empty())
362 has_new_particle =
true;
367 m_bpe.m_waiting_messages.add(sm);
370 return has_new_particle;
377 AsyncParticleExchanger);
380 AsyncParticleExchanger);
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_REGISTER_SUB_DOMAIN_FACTORY(aclass, ainterface, aname)
Enregistre un service de fabrique pour la classe aclass.
#define ARCANE_REGISTER_CASE_OPTIONS_NOAXL_FACTORY(aclass, ainterface, aname)
Enregistre un service de fabrique pour la classe aclass.
Tableau d'items de types quelconques.
Interface d'un échangeur de particules asynchrone.
Interface d'une famille d'entités.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual Int32 commRank() const =0
Rang de cette instance dans le communicateur.
virtual MessageId probe(const PointToPointMessageInfo &message)=0
Sonde si des messages sont disponibles.
virtual IParallelNonBlockingCollective * nonBlockingCollective() const =0
Interface des opérations collectives non blocantes.
virtual UniqueArray< Integer > testSomeRequests(ArrayView< Request > rvalues)=0
Test si une des requêtes rvalues est terminée.
virtual bool isParallel() const =0
Retourne true si l'exécution est parallèle.
Interface des opérations parallèles collectives non bloquantes.
Groupe d'entités de maillage.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Sentinelle pour le timer. La sentinelle associée à un timer permet de déclancher celui-ci au moment d...
Vue constante d'un tableau de type T.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
Exception lorsqu'une opération n'est pas supportée.
UniqueArray< Int64 > Int64UniqueArray
Tableau dynamique à une dimension d'entiers 64 bits.
UniqueArray< Int32 > Int32UniqueArray
Tableau dynamique à une dimension d'entiers 32 bits.
Int32 Integer
Type représentant un entier.