14#include "arcane/utils/MemoryView.h"
16#include "arcane/parallel/mpi/MpiParallelMng.h"
17#include "arcane/parallel/mpi/MpiDatatypeList.h"
18#include "arcane/parallel/mpi/MpiDatatype.h"
19#include "arcane/parallel/IStat.h"
21#include "arcane/impl/IDataSynchronizeBuffer.h"
22#include "arcane/impl/IDataSynchronizeImplementation.h"
24#include "arccore/message_passing_mpi/internal/IMpiProfiling.h"
25#include "arccore/message_passing_mpi/internal/MpiAdapter.h"
46class MpiLegacyVariableSynchronizerDispatcher
52 explicit MpiLegacyVariableSynchronizerDispatcher(
Factory* f);
56 void compute()
override {}
79 : m_mpi_parallel_mng(mpi_pm)
84 auto* x =
new MpiLegacyVariableSynchronizerDispatcher(
this);
97arcaneCreateMpiLegacyVariableSynchronizerFactory(
MpiParallelMng* mpi_pm)
106MpiLegacyVariableSynchronizerDispatcher::
107MpiLegacyVariableSynchronizerDispatcher(Factory* f)
108: m_mpi_parallel_mng(f->m_mpi_parallel_mng)
115void MpiLegacyVariableSynchronizerDispatcher::
116beginSynchronize(IDataSynchronizeBuffer* vs_buf)
118 Integer nb_message = vs_buf->nbRank();
120 m_send_requests.
clear();
122 MpiParallelMng* pm = m_mpi_parallel_mng;
124 MpiDatatypeList* dtlist = pm->datatypes();
125 MP::Mpi::IMpiProfiling* mpi_profiling = m_mpi_parallel_mng->adapter()->getMpiProfiling();
132 MPI_Datatype byte_dt = dtlist->datatype(
Byte())->datatype();
136 m_recv_requests.
resize(nb_message);
137 m_recv_requests_done.
resize(nb_message);
138 double begin_prepare_time = MPI_Wtime();
139 for (
Integer i = 0; i < nb_message; ++i) {
140 Int32 target_rank = vs_buf->targetRank(i);
141 auto ghost_local_buffer = vs_buf->receiveBuffer(i).bytes().smallView();
142 if (!ghost_local_buffer.empty()) {
143 MPI_Request mpi_request;
144 mpi_profiling->iRecv(ghost_local_buffer.data(), ghost_local_buffer.size(),
145 byte_dt, target_rank, 523, comm, &mpi_request);
146 m_recv_requests[i] = mpi_request;
147 m_recv_requests_done[i] =
false;
153 m_recv_requests[i] = MPI_Request();
154 m_recv_requests_done[i] =
true;
158 vs_buf->copyAllSend();
161 for (
Integer i = 0; i < nb_message; ++i) {
162 Int32 target_rank = vs_buf->targetRank(i);
163 auto share_local_buffer = vs_buf->sendBuffer(i).bytes().smallView();
164 if (!share_local_buffer.empty()) {
165 MPI_Request mpi_request;
166 mpi_profiling->iSend(share_local_buffer.data(), share_local_buffer.size(),
167 byte_dt, target_rank, 523, comm, &mpi_request);
168 m_send_requests.add(mpi_request);
172 double prepare_time = MPI_Wtime() - begin_prepare_time;
173 pm->stat()->add(
"SyncPrepare", prepare_time, vs_buf->totalSendSize());
180void MpiLegacyVariableSynchronizerDispatcher::
181endSynchronize(IDataSynchronizeBuffer* vs_buf)
183 MpiParallelMng* pm = m_mpi_parallel_mng;
189 UniqueArray<MPI_Request> remaining_request;
190 UniqueArray<Integer> remaining_indexes;
192 UniqueArray<MPI_Status> mpi_status;
193 UniqueArray<int> completed_requests;
195 UniqueArray<MPI_Request> m_remaining_recv_requests;
196 UniqueArray<Integer> m_remaining_recv_request_indexes;
197 double copy_time = 0.0;
198 double wait_time = 0.0;
200 m_remaining_recv_requests.clear();
201 m_remaining_recv_request_indexes.clear();
202 for (
Integer i = 0; i < m_recv_requests.size(); ++i) {
203 if (!m_recv_requests_done[i]) {
204 m_remaining_recv_requests.add(m_recv_requests[i]);
205 m_remaining_recv_request_indexes.add(i);
208 Integer nb_remaining_request = m_remaining_recv_requests.size();
209 if (nb_remaining_request == 0)
211 int nb_completed_request = 0;
212 mpi_status.resize(nb_remaining_request);
213 completed_requests.resize(nb_remaining_request);
215 double begin_time = MPI_Wtime();
218 m_mpi_parallel_mng->adapter()->getMpiProfiling()->waitSome(nb_remaining_request, m_remaining_recv_requests.data(),
219 &nb_completed_request, completed_requests.data(),
222 double end_time = MPI_Wtime();
223 wait_time += (end_time - begin_time);
226 for (
int z = 0; z < nb_completed_request; ++z) {
227 int mpi_request_index = completed_requests[z];
228 Integer index = m_remaining_recv_request_indexes[mpi_request_index];
231 double begin_time = MPI_Wtime();
232 vs_buf->copyReceiveAsync(index);
233 double end_time = MPI_Wtime();
234 copy_time += (end_time - begin_time);
237 m_recv_requests_done[index] =
true;
243 mpi_status.resize(m_send_requests.size());
244 m_mpi_parallel_mng->adapter()->getMpiProfiling()->waitAll(m_send_requests.size(), m_send_requests.data(),
252 Int64 total_ghost_size = vs_buf->totalReceiveSize();
253 Int64 total_share_size = vs_buf->totalSendSize();
254 Int64 total_size = total_ghost_size + total_share_size;
255 pm->stat()->add(
"SyncCopy", copy_time, total_ghost_size);
256 pm->stat()->add(
"SyncWait", wait_time, total_size);
void resize(Int64 s)
Changes the number of elements in the array to s.
Generic buffer for data synchronization.
Interface for a generic dispatcher factory.
Parallelism manager using MPI.
Communicator communicator() const override
MPI communicator associated with this manager.
Reference to an instance.
1D data vector with value semantics (STL style).
void resize(Int64 s)
Changes the number of elements in the array to s.
void clear()
Removes the elements from the array.
-- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature --
std::int64_t Int64
Signed integer type of 64 bits.
Int32 Integer
Type representing an integer.
unsigned char Byte
Type of a byte.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Creates a reference on a pointer.
std::int32_t Int32
Signed integer type of 32 bits.