14#include "arcane/utils/FatalErrorException.h"
15#include "arcane/utils/MemoryView.h"
17#include "arcane/parallel/mpi/MpiParallelMng.h"
18#include "arcane/parallel/mpi/MpiAdapter.h"
19#include "arcane/parallel/mpi/MpiTimeInterval.h"
20#include "arcane/parallel/IStat.h"
22#include "arcane/impl/IDataSynchronizeBuffer.h"
23#include "arcane/impl/IDataSynchronizeImplementation.h"
25#include "arccore/message_passing/IRequestList.h"
65 void compute()
override {}
78 bool _isSkipRank(Int32 rank, Int32
sequence)
const;
90 : m_mpi_parallel_mng(
mpi_pm)
91 , m_block_size(block_size)
104 Int32 m_block_size = 0;
105 Int32 m_nb_sequence = 0;
121MpiBlockVariableSynchronizerDispatcher::
122MpiBlockVariableSynchronizerDispatcher(Factory* f)
123: m_mpi_parallel_mng(f->m_mpi_parallel_mng)
124, m_request_list(m_mpi_parallel_mng->createRequestListRef())
125, m_block_size(f->m_block_size)
126, m_nb_sequence(f->m_nb_sequence)
133bool MpiBlockVariableSynchronizerDispatcher::
136 if (m_nb_sequence == 1)
138 return (rank % m_nb_sequence) == sequence;
144void MpiBlockVariableSynchronizerDispatcher::
145beginSynchronize(IDataSynchronizeBuffer* vs_buf)
153 double send_copy_time = 0.0;
155 MpiTimeInterval tit(&send_copy_time);
157 vs_buf->copyAllSend();
159 Int64 total_share_size = vs_buf->totalSendSize();
160 m_mpi_parallel_mng->
stat()->
add(
"SyncSendCopy", send_copy_time, total_share_size);
166void MpiBlockVariableSynchronizerDispatcher::
167endSynchronize(IDataSynchronizeBuffer* vs_buf)
169 const Int32 nb_message = vs_buf->nbRank();
171 MpiParallelMng* pm = m_mpi_parallel_mng;
174 MP::Mpi::MpiAdapter* mpi_adapter = pm->adapter();
175 const MPI_Datatype mpi_dt = MP::Mpi::MpiBuiltIn::datatype(
Byte());
177 double prepare_time = 0.0;
178 double copy_time = 0.0;
179 double wait_time = 0.0;
181 constexpr int serialize_tag = 523;
183 const Int32 block_size = m_block_size;
185 for (
Int32 isequence = 0; isequence < m_nb_sequence; ++isequence) {
186 Int32 block_index = 0;
189 MpiTimeInterval tit(&prepare_time);
190 m_request_list->clear();
193 for (Integer i = 0; i < nb_message; ++i) {
194 Int32 target_rank = vs_buf->targetRank(i);
195 if (_isSkipRank(target_rank, isequence))
197 auto buf0 = vs_buf->receiveBuffer(i).bytes();
198 auto buf = buf0.subSpan(block_index, block_size);
200 auto req = mpi_adapter->receiveNonBlockingNoStat(buf.data(), buf.size(),
201 target_rank, mpi_dt, serialize_tag);
202 m_request_list->add(req);
207 for (Integer i = 0; i < nb_message; ++i) {
208 Int32 target_rank = vs_buf->targetRank(i);
209 if (_isSkipRank(my_rank, isequence))
211 auto buf0 = vs_buf->sendBuffer(i).bytes();
212 auto buf = buf0.subSpan(block_index, block_size);
214 auto request = mpi_adapter->sendNonBlockingNoStat(buf.data(), buf.size(),
215 target_rank, mpi_dt, serialize_tag);
216 m_request_list->add(request);
222 if (m_request_list->size() == 0)
227 MpiTimeInterval tit(&wait_time);
228 m_request_list->wait(Parallel::WaitAll);
231 block_index += block_size;
237 MpiTimeInterval tit(©_time);
238 vs_buf->copyAllReceive();
241 Int64 total_ghost_size = vs_buf->totalReceiveSize();
242 Int64 total_share_size = vs_buf->totalSendSize();
243 Int64 total_size = total_ghost_size + total_share_size;
244 pm->
stat()->add(
"SyncCopy", copy_time, total_ghost_size);
245 pm->
stat()->add(
"SyncWait", wait_time, total_size);
246 pm->
stat()->add(
"SyncPrepare", prepare_time, total_share_size);
Buffer générique pour la synchronisation de données.
Interface d'une fabrique dispatcher générique.
virtual Int32 commRank() const =0
Rang de cette instance dans le communicateur.
virtual IStat * stat()=0
Gestionnaire des statistiques.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Implémentation par block pour MPI de la synchronisation.
Gestionnaire du parallélisme utilisant MPI.
Parallel::IStat * stat() override
Gestionnaire des statistiques.
virtual void add(const String &name, double elapsed_time, Int64 msg_size)=0
Ajoute une statistique.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
unsigned char Byte
Type d'un octet.