14#include "arcane/utils/FatalErrorException.h"
15#include "arcane/utils/MemoryView.h"
17#include "arcane/parallel/mpi/MpiParallelMng.h"
18#include "arcane/parallel/mpi/MpiTimeInterval.h"
19#include "arcane/parallel/IStat.h"
21#include "arcane/impl/IDataSynchronizeBuffer.h"
22#include "arcane/impl/IDataSynchronizeImplementation.h"
24#include "arccore/message_passing/IRequestList.h"
25#include "arccore/message_passing_mpi/internal/MpiAdapter.h"
54class MpiBlockVariableSynchronizerDispatcher
60 explicit MpiBlockVariableSynchronizerDispatcher(
Factory* f);
64 void compute()
override {}
77 bool _isSkipRank(
Int32 rank,
Int32 sequence)
const;
89 : m_mpi_parallel_mng(mpi_pm)
90 , m_block_size(block_size)
91 , m_nb_sequence(nb_sequence)
96 auto* x =
new MpiBlockVariableSynchronizerDispatcher(
this);
103 Int32 m_block_size = 0;
104 Int32 m_nb_sequence = 0;
120MpiBlockVariableSynchronizerDispatcher::
121MpiBlockVariableSynchronizerDispatcher(Factory* f)
122: m_mpi_parallel_mng(f->m_mpi_parallel_mng)
123, m_request_list(m_mpi_parallel_mng->createRequestListRef())
124, m_block_size(f->m_block_size)
125, m_nb_sequence(f->m_nb_sequence)
132bool MpiBlockVariableSynchronizerDispatcher::
133_isSkipRank(Int32 rank, Int32 sequence)
const
135 if (m_nb_sequence == 1)
137 return (rank % m_nb_sequence) == sequence;
143void MpiBlockVariableSynchronizerDispatcher::
144beginSynchronize(IDataSynchronizeBuffer* vs_buf)
152 double send_copy_time = 0.0;
154 MpiTimeInterval tit(&send_copy_time);
156 vs_buf->copyAllSend();
158 Int64 total_share_size = vs_buf->totalSendSize();
159 m_mpi_parallel_mng->stat()->add(
"SyncSendCopy", send_copy_time, total_share_size);
165void MpiBlockVariableSynchronizerDispatcher::
166endSynchronize(IDataSynchronizeBuffer* vs_buf)
168 const Int32 nb_message = vs_buf->nbRank();
170 MpiParallelMng* pm = m_mpi_parallel_mng;
171 Int32 my_rank = pm->commRank();
173 MP::Mpi::MpiAdapter* mpi_adapter = pm->adapter();
174 const MPI_Datatype mpi_dt = MP::Mpi::MpiBuiltIn::datatype(
Byte());
176 double prepare_time = 0.0;
177 double copy_time = 0.0;
178 double wait_time = 0.0;
180 constexpr int serialize_tag = 523;
182 const Int32 block_size = m_block_size;
184 for (
Int32 isequence = 0; isequence < m_nb_sequence; ++isequence) {
185 Int32 block_index = 0;
188 MpiTimeInterval tit(&prepare_time);
189 m_request_list->clear();
192 for (
Integer i = 0; i < nb_message; ++i) {
193 Int32 target_rank = vs_buf->targetRank(i);
194 if (_isSkipRank(target_rank, isequence))
196 auto buf0 = vs_buf->receiveBuffer(i).bytes();
197 auto buf = buf0.subSpan(block_index, block_size);
199 auto req = mpi_adapter->receiveNonBlockingNoStat(buf.data(), buf.size(),
200 target_rank, mpi_dt, serialize_tag);
201 m_request_list->add(req);
206 for (
Integer i = 0; i < nb_message; ++i) {
207 Int32 target_rank = vs_buf->targetRank(i);
208 if (_isSkipRank(my_rank, isequence))
210 auto buf0 = vs_buf->sendBuffer(i).bytes();
211 auto buf = buf0.subSpan(block_index, block_size);
213 auto request = mpi_adapter->sendNonBlockingNoStat(buf.data(), buf.size(),
214 target_rank, mpi_dt, serialize_tag);
215 m_request_list->add(request);
221 if (m_request_list->size() == 0)
226 MpiTimeInterval tit(&wait_time);
227 m_request_list->wait(Parallel::WaitAll);
230 block_index += block_size;
236 MpiTimeInterval tit(©_time);
237 vs_buf->copyAllReceive();
240 Int64 total_ghost_size = vs_buf->totalReceiveSize();
241 Int64 total_share_size = vs_buf->totalSendSize();
242 Int64 total_size = total_ghost_size + total_share_size;
243 pm->stat()->add(
"SyncCopy", copy_time, total_ghost_size);
244 pm->stat()->add(
"SyncWait", wait_time, total_size);
245 pm->stat()->add(
"SyncPrepare", prepare_time, total_share_size);
Generic buffer for data synchronization.
Interface for a generic dispatcher factory.
Parallelism manager using MPI.
Reference to an instance.
-- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature --
std::int64_t Int64
Signed integer type of 64 bits.
Int32 Integer
Type representing an integer.
unsigned char Byte
Type of a byte.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Creates a reference on a pointer.
std::int32_t Int32
Signed integer type of 32 bits.