14#include "arcane/utils/MemoryView.h"
16#include "arcane/parallel/mpi/MpiParallelMng.h"
17#include "arcane/parallel/mpi/MpiDatatypeList.h"
18#include "arcane/parallel/mpi/MpiDatatype.h"
19#include "arcane/parallel/IStat.h"
21#include "arcane/impl/IDataSynchronizeBuffer.h"
22#include "arcane/impl/IDataSynchronizeImplementation.h"
24#include "arccore/message_passing_mpi/internal/IMpiProfiling.h"
25#include "arccore/message_passing_mpi/internal/MpiAdapter.h"
45class MpiLegacyVariableSynchronizerDispatcher
51 explicit MpiLegacyVariableSynchronizerDispatcher(
Factory* f);
55 void compute()
override {}
78 : m_mpi_parallel_mng(mpi_pm)
83 auto* x =
new MpiLegacyVariableSynchronizerDispatcher(
this);
96arcaneCreateMpiLegacyVariableSynchronizerFactory(
MpiParallelMng* mpi_pm)
105MpiLegacyVariableSynchronizerDispatcher::
106MpiLegacyVariableSynchronizerDispatcher(Factory* f)
107: m_mpi_parallel_mng(f->m_mpi_parallel_mng)
114void MpiLegacyVariableSynchronizerDispatcher::
115beginSynchronize(IDataSynchronizeBuffer* vs_buf)
117 Integer nb_message = vs_buf->nbRank();
119 m_send_requests.
clear();
121 MpiParallelMng* pm = m_mpi_parallel_mng;
123 MpiDatatypeList* dtlist = pm->datatypes();
124 MP::Mpi::IMpiProfiling* mpi_profiling = m_mpi_parallel_mng->adapter()->getMpiProfiling();
131 MPI_Datatype byte_dt = dtlist->datatype(
Byte())->datatype();
135 m_recv_requests.
resize(nb_message);
136 m_recv_requests_done.
resize(nb_message);
137 double begin_prepare_time = MPI_Wtime();
138 for(
Integer i=0; i<nb_message; ++i ){
139 Int32 target_rank = vs_buf->targetRank(i);
140 auto ghost_local_buffer = vs_buf->receiveBuffer(i).bytes().smallView();
141 if (!ghost_local_buffer.empty()){
142 MPI_Request mpi_request;
143 mpi_profiling->iRecv(ghost_local_buffer.data(),ghost_local_buffer.size(),
144 byte_dt,target_rank,523,comm,&mpi_request);
145 m_recv_requests[i] = mpi_request;
146 m_recv_requests_done[i] =
false;
152 m_recv_requests[i] = MPI_Request();
153 m_recv_requests_done[i] =
true;
157 vs_buf->copyAllSend();
160 for(
Integer i=0; i<nb_message; ++i ){
161 Int32 target_rank = vs_buf->targetRank(i);
162 auto share_local_buffer = vs_buf->sendBuffer(i).bytes().smallView();
163 if (!share_local_buffer.empty()){
164 MPI_Request mpi_request;
165 mpi_profiling->iSend(share_local_buffer.data(),share_local_buffer.size(),
166 byte_dt,target_rank,523,comm,&mpi_request);
167 m_send_requests.add(mpi_request);
171 double prepare_time = MPI_Wtime() - begin_prepare_time;
172 pm->stat()->add(
"SyncPrepare",prepare_time,vs_buf->totalSendSize());
179void MpiLegacyVariableSynchronizerDispatcher::
180endSynchronize(IDataSynchronizeBuffer* vs_buf)
182 MpiParallelMng* pm = m_mpi_parallel_mng;
188 UniqueArray<MPI_Request> remaining_request;
189 UniqueArray<Integer> remaining_indexes;
191 UniqueArray<MPI_Status> mpi_status;
192 UniqueArray<int> completed_requests;
194 UniqueArray<MPI_Request> m_remaining_recv_requests;
195 UniqueArray<Integer> m_remaining_recv_request_indexes;
196 double copy_time = 0.0;
197 double wait_time = 0.0;
199 m_remaining_recv_requests.clear();
200 m_remaining_recv_request_indexes.clear();
201 for(
Integer i=0; i<m_recv_requests.size(); ++i ){
202 if (!m_recv_requests_done[i]){
203 m_remaining_recv_requests.add(m_recv_requests[i]);
204 m_remaining_recv_request_indexes.add(i);
207 Integer nb_remaining_request = m_remaining_recv_requests.size();
208 if (nb_remaining_request==0)
210 int nb_completed_request = 0;
211 mpi_status.resize(nb_remaining_request);
212 completed_requests.resize(nb_remaining_request);
214 double begin_time = MPI_Wtime();
217 m_mpi_parallel_mng->adapter()->getMpiProfiling()->waitSome(nb_remaining_request,m_remaining_recv_requests.data(),
218 &nb_completed_request,completed_requests.data(),
221 double end_time = MPI_Wtime();
222 wait_time += (end_time-begin_time);
225 for(
int z=0; z<nb_completed_request; ++z ){
226 int mpi_request_index = completed_requests[z];
227 Integer index = m_remaining_recv_request_indexes[mpi_request_index];
230 double begin_time = MPI_Wtime();
231 vs_buf->copyReceiveAsync(index);
232 double end_time = MPI_Wtime();
233 copy_time += (end_time - begin_time);
236 m_recv_requests_done[index] =
true;
242 mpi_status.resize(m_send_requests.size());
243 m_mpi_parallel_mng->adapter()->getMpiProfiling()->waitAll(m_send_requests.size(),m_send_requests.data(),
251 Int64 total_ghost_size = vs_buf->totalReceiveSize();
252 Int64 total_share_size = vs_buf->totalSendSize();
253 Int64 total_size = total_ghost_size + total_share_size;
254 pm->stat()->add(
"SyncCopy",copy_time,total_ghost_size);
255 pm->stat()->add(
"SyncWait",wait_time,total_size);
void resize(Int64 s)
Change le nombre d'éléments du tableau à s.
Buffer générique pour la synchronisation de données.
Interface d'une fabrique dispatcher générique.
Gestionnaire du parallélisme utilisant MPI.
Communicator communicator() const override
Communicateur MPI associé à ce gestionnaire.
Référence à une instance.
Vecteur 1D de données avec sémantique par valeur (style STL).
void clear()
Supprime les éléments du tableau.
void resize(Int64 s)
Change le nombre d'éléments du tableau à s.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
unsigned char Byte
Type d'un octet.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int32_t Int32
Type entier signé sur 32 bits.