14#include "arcane/mesh/AsyncParticleExchanger.h"
15#include "arcane/core/IParallelNonBlockingCollective.h"
17#include "arcane/core/internal/SerializeMessage.h"
24using namespace Arcane::MessagePassing;
29AsyncParticleExchanger::
30AsyncParticleExchanger(
const ServiceBuildInfo& sbi)
39AsyncParticleExchanger::
40~AsyncParticleExchanger()
50void AsyncParticleExchanger::
56 m_bpe.setVerboseLevel(0);
62void AsyncParticleExchanger::
65 m_bpe.initialize(item_family);
71 "AsyncParticleExchanger is not supported because NonBlocking"
72 " collectives are not available");
79void AsyncParticleExchanger::
80beginNewExchange(
Integer nb_particule)
82 info() <<
"AsyncParticleExchanger is used. It also use BasicParticleExchanger functionnalities";
83 m_bpe.beginNewExchange(nb_particule);
85 m_nb_particle_send_before_reduction = 0;
86 m_nb_particle_send_before_reduction_tmp = 0;
87 m_sum_of_nb_particle_sent = 1;
93bool AsyncParticleExchanger::
94exchangeItems(
Integer nb_particle_finish_exchange,
100 return m_bpe.exchangeItems(nb_particle_finish_exchange, local_ids,
101 sub_domains_to_send, item_group, functor);
107bool AsyncParticleExchanger::
108exchangeItems(
Integer nb_particle_finish_exchange,
114 return m_bpe.exchangeItems(nb_particle_finish_exchange, local_ids,
115 sub_domains_to_send, new_particle_local_ids, functor);
121void AsyncParticleExchanger::
122sendItems(
Integer nb_particle_finish_exchange,
126 m_bpe.sendItems(nb_particle_finish_exchange, local_ids, sub_domains_to_send);
132bool AsyncParticleExchanger::
133waitMessages(
Integer nb_pending_particles,
137 return m_bpe.waitMessages(nb_pending_particles, new_particle_local_ids, functor);
143void AsyncParticleExchanger::
144addNewParticles(
Integer nb_particle)
146 m_bpe.addNewParticles(nb_particle);
155 return m_bpe.itemFamily();
161void AsyncParticleExchanger::
164 m_bpe.setVerboseLevel(level);
173 return m_bpe.verboseLevel();
180asyncParticleExchanger()
188bool AsyncParticleExchanger::
189exchangeItemsAsync(
Integer nb_particle_finish_exchange,
194 bool has_local_flying_particles)
196 ARCANE_UNUSED(nb_particle_finish_exchange);
197 ARCANE_UNUSED(functor);
199 bool is_finished =
false;
203 m_bpe.m_nb_particle_send = local_ids.
size();
206 _generateSendItemsAsync(local_ids, sub_domains_to_send);
208 if (m_bpe.m_verbose_level >= 1)
209 info() <<
"ASE_BeginLoop loop=" << m_bpe.m_nb_loop;
210 m_bpe._sendPendingMessages();
212 if (new_particle_local_ids)
213 new_particle_local_ids->
clear();
215 bool has_new_particle = _waitSomeMessages(
ItemGroup(), new_particle_local_ids);
216 if (has_new_particle)
217 has_local_flying_particles =
true;
234 if (isIallReduceRunning.
size() != 0) {
235 m_reduce_requests.clear();
236 if (m_bpe.m_verbose_level >= 1)
237 info() <<
"PSM_IAllReduceFinished loop=" << m_bpe.m_nb_loop
238 <<
" total=" << m_sum_of_nb_particle_sent;
244 if ((!has_local_flying_particles) && (m_reduce_requests.size() == 0) && (m_bpe.m_waiting_messages.size() == 0) && (m_bpe.m_pending_messages.size() == 0)) {
245 if (m_sum_of_nb_particle_sent > 0) {
248 m_nb_particle_send_before_reduction = m_nb_particle_send_before_reduction_tmp;
249 if (m_bpe.m_verbose_level >= 1)
250 info() <<
"PSM_DoIAllReduce loop=" << m_bpe.m_nb_loop
251 <<
" n=" << m_nb_particle_send_before_reduction
252 <<
" nb_to_send=" << local_ids.
size();
256 m_nb_particle_send_before_reduction_tmp = 0;
268void AsyncParticleExchanger::
273 IMesh*
mesh = m_bpe.m_item_family->mesh();
276 mesh->cellFamily()->getCommunicatingSubDomains(communicating_sub_domains);
278 Integer nb_connected_sub_domain = communicating_sub_domains.
size();
282 m_bpe.m_accumulate_infos.clear();
283 m_bpe.m_accumulate_infos.resize(nb_connected_sub_domain);
285 m_bpe._addItemsToSend(local_ids, sub_domains_to_send, communicating_sub_domains, ids_to_send);
297 for (
Integer j = 0; j < nb_connected_sub_domain; ++j) {
298 if (ids_to_send[j].size() != 0) {
300 ISerializeMessage::MT_Send);
301 m_bpe.m_accumulate_infos[j] = sm;
302 m_bpe._serializeMessage(sm, ids_to_send[j], items_to_send_uid, items_to_send_cells_uid);
303 m_bpe.m_pending_messages.add(sm);
304 m_nb_particle_send_before_reduction_tmp += ids_to_send[j].size();
312 for (
Integer j = 0; j < nb_connected_sub_domain; ++j) {
314 MessageTag tag(Arcane::MessagePassing::internal::BasicSerializeMessage::DEFAULT_SERIALIZE_TAG_VALUE);
317 message.setBlocking(
false);
322 m_bpe.m_pending_messages.add(recv_sm);
326 m_bpe.m_accumulate_infos.clear();
328 m_bpe.m_item_family->toParticleFamily()->removeParticles(local_ids);
329 m_bpe.m_item_family->endUpdate();
335bool AsyncParticleExchanger::
342 m_bpe.m_total_time_waiting += m_bpe.m_timer->lastActivationTime();
348 m_bpe.m_waiting_messages.clear();
354 bool has_new_particle =
false;
355 for (
Integer i = 0, is = current_messages.size(); i < is; ++i) {
359 m_bpe._deserializeMessage(sm, items_to_create_unique_id, items_to_create_cells_unique_id,
360 items_to_create_local_id, items_to_create_cells_local_id,
361 item_group, new_particle_local_ids);
364 if (!items_to_create_unique_id.
empty())
365 has_new_particle =
true;
370 m_bpe.m_waiting_messages.add(sm);
373 return has_new_particle;
380 AsyncParticleExchanger);
383 AsyncParticleExchanger);
#define ARCANE_THROW(exception_class,...)
Macro for throwing an exception with formatting.
#define ARCANE_REGISTER_SUB_DOMAIN_FACTORY(aclass, ainterface, aname)
Registers a factory service for the class aclass.
#define ARCANE_REGISTER_CASE_OPTIONS_NOAXL_FACTORY(aclass, ainterface, aname)
Registers a factory service for the class aclass.
Integer size() const
Number of elements in the vector.
bool empty() const
Capacity (number of allocated elements) of the vector.
Modifiable view of an array of type T.
void clear()
Removes the elements from the array.
Constant view of an array of type T.
constexpr Integer size() const noexcept
Number of elements in the array.
Interface of an asynchronous particle exchanger.
Interface of an entity family.
Interface of the parallelism manager for a subdomain.
virtual Int32 commRank() const =0
Rank of this instance in the communicator.
virtual MessageId probe(const PointToPointMessageInfo &message)=0
Probes if messages are available.
virtual IParallelNonBlockingCollective * nonBlockingCollective() const =0
Interface for non-blocking collective operations.
virtual UniqueArray< Integer > testSomeRequests(ArrayView< Request > rvalues)=0
Tests if one of the rvalues requests is complete.
virtual bool isParallel() const =0
Returns true if the execution is parallel.
Interface for non-blocking collective parallel operations.
virtual Request allReduce(eReduceType rt, ConstArrayView< char > send_buf, ArrayView< char > recv_buf)=0
Performs the reduction of type rt on the array send_buf and stores the result in recv_buf.
Interface of a particle exchanger.
Interface for a serialization message between IMessagePassingMng.
virtual bool finished() const =0
true if the message is finished
virtual bool isSend() const =0
true if it should send, false if it should receive
Information for sending/receiving a point-to-point message.
Exception when an operation is not supported.
Message using a SerializeBuffer.
Positions the phase of the currently executing action.
Sentinel for the timer. The sentinel associated with a timer allows it to be triggered upon its const...
TraceMessage info() const
Flow for an information message.
1D data vector with value semantics (STL style).
@ ReduceSum
Sum of values.
UniqueArray< Int64 > Int64UniqueArray
Dynamic 1D array of 64-bit integers.
Int32 Integer
Type representing an integer.
ConstArrayView< Int32 > Int32ConstArrayView
C equivalent of a 1D array of 32-bit integers.
UniqueArray< Int32 > Int32UniqueArray
Dynamic 1D array of 32-bit integers.
Array< Int32 > Int32Array
Dynamic one-dimensional array of 32-bit integers.