14#include "arcane/utils/FatalErrorException.h"
15#include "arcane/utils/MemoryView.h"
17#include "arcane/parallel/mpi/MpiParallelMng.h"
18#include "arcane/parallel/mpi/MpiTimeInterval.h"
19#include "arcane/parallel/IStat.h"
21#include "arcane/impl/IDataSynchronizeBuffer.h"
22#include "arcane/impl/IDataSynchronizeImplementation.h"
24#include "arccore/message_passing/IRequestList.h"
25#include "arccore/message_passing_mpi/internal/MpiAdapter.h"
64class MpiVariableSynchronizeDispatcher
70 explicit MpiVariableSynchronizeDispatcher(
Factory* f);
74 void compute()
override {}
96 : m_mpi_parallel_mng(mpi_pm)
101 auto* x =
new MpiVariableSynchronizeDispatcher(
this);
123MpiVariableSynchronizeDispatcher::
124MpiVariableSynchronizeDispatcher(Factory* f)
125: m_mpi_parallel_mng(f->m_mpi_parallel_mng)
126, m_receive_request_list(m_mpi_parallel_mng->createRequestListRef())
127, m_send_request_list(m_mpi_parallel_mng->createRequestListRef())
134void MpiVariableSynchronizeDispatcher::
135beginSynchronize(IDataSynchronizeBuffer* ds_buf)
137 Integer nb_message = ds_buf->nbRank();
139 m_send_request_list->clear();
141 MpiParallelMng* pm = m_mpi_parallel_mng;
143 MP::Mpi::MpiAdapter* mpi_adapter = pm->adapter();
144 const MPI_Datatype mpi_dt = MP::Mpi::MpiBuiltIn::datatype(
Byte());
146 double prepare_time = 0.0;
149 MpiTimeInterval tit(&prepare_time);
150 constexpr int serialize_tag = 523;
153 m_original_recv_requests_done.
resize(nb_message);
154 m_original_recv_requests.resize(nb_message);
157 for (
Integer i = 0; i < nb_message; ++i) {
158 Int32 target_rank = ds_buf->targetRank(i);
159 auto buf = ds_buf->receiveBuffer(i).bytes();
161 auto req = mpi_adapter->receiveNonBlockingNoStat(buf.data(), buf.size(),
162 target_rank, mpi_dt, serialize_tag);
163 m_original_recv_requests[i] = req;
164 m_original_recv_requests_done[i] =
false;
169 m_original_recv_requests[i] = Parallel::Request{};
170 m_original_recv_requests_done[i] =
true;
175 ds_buf->copyAllSend();
178 for (
Integer i = 0; i < nb_message; ++i) {
179 auto buf = ds_buf->sendBuffer(i).bytes();
180 Int32 target_rank = ds_buf->targetRank(i);
182 auto request = mpi_adapter->sendNonBlockingNoStat(buf.data(), buf.size(),
183 target_rank, mpi_dt, serialize_tag);
184 m_send_request_list->add(request);
188 pm->stat()->add(
"SyncPrepare", prepare_time, ds_buf->totalSendSize());
194void MpiVariableSynchronizeDispatcher::
195endSynchronize(IDataSynchronizeBuffer* ds_buf)
197 MpiParallelMng* pm = m_mpi_parallel_mng;
201 UniqueArray<Integer> remaining_original_indexes;
203 double copy_time = 0.0;
204 double wait_time = 0.0;
208 m_receive_request_list->clear();
209 remaining_original_indexes.clear();
210 for (
Integer i = 0, n = m_original_recv_requests_done.size(); i < n; ++i) {
211 if (!m_original_recv_requests_done[i]) {
212 m_receive_request_list->add(m_original_recv_requests[i]);
213 remaining_original_indexes.add(i);
216 Integer nb_remaining_request = m_receive_request_list->size();
217 if (nb_remaining_request == 0)
221 MpiTimeInterval tit(&wait_time);
226 ConstArrayView<Int32> done_requests = m_receive_request_list->doneRequestIndexes();
228 for (
Int32 request_index : done_requests) {
229 Int32 orig_index = remaining_original_indexes[request_index];
232 m_original_recv_requests_done[orig_index] =
true;
236 MpiTimeInterval tit(©_time);
237 ds_buf->copyReceiveAsync(orig_index);
246 MpiTimeInterval tit(&wait_time);
247 m_send_request_list->wait(Parallel::WaitAll);
253 Int64 total_ghost_size = ds_buf->totalReceiveSize();
254 Int64 total_share_size = ds_buf->totalSendSize();
255 Int64 total_size = total_ghost_size + total_share_size;
256 pm->stat()->add(
"SyncCopy", copy_time, total_ghost_size);
257 pm->stat()->add(
"SyncWait", wait_time, total_size);
void resize(Int64 s)
Changes the number of elements in the array to s.
Generic buffer for data synchronization.
Interface for a generic dispatcher factory.
Parallelism manager using MPI.
Reference to an instance.
1D data vector with value semantics (STL style).
@ WaitSome
Wait until all messages in the list are processed.
-- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature --
std::int64_t Int64
Signed integer type of 64 bits.
Int32 Integer
Type representing an integer.
unsigned char Byte
Type of a byte.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Creates a reference on a pointer.
std::int32_t Int32
Signed integer type of 32 bits.