14#include "arcane/utils/MemoryView.h"
16#include "arcane/parallel/mpi/MpiParallelMng.h"
17#include "arcane/parallel/mpi/MpiAdapter.h"
18#include "arcane/parallel/mpi/MpiDatatypeList.h"
19#include "arcane/parallel/mpi/MpiDatatype.h"
20#include "arcane/parallel/IStat.h"
22#include "arcane/impl/IDataSynchronizeBuffer.h"
23#include "arcane/impl/IDataSynchronizeImplementation.h"
53 void compute()
override {}
76 : m_mpi_parallel_mng(
mpi_pm)
103MpiLegacyVariableSynchronizerDispatcher::
104MpiLegacyVariableSynchronizerDispatcher(Factory* f)
105: m_mpi_parallel_mng(f->m_mpi_parallel_mng)
112void MpiLegacyVariableSynchronizerDispatcher::
113beginSynchronize(IDataSynchronizeBuffer* vs_buf)
115 Integer nb_message = vs_buf->nbRank();
117 m_send_requests.
clear();
119 MpiParallelMng* pm = m_mpi_parallel_mng;
121 MpiDatatypeList* dtlist = pm->datatypes();
122 MP::Mpi::IMpiProfiling* mpi_profiling = m_mpi_parallel_mng->adapter()->getMpiProfiling();
129 MPI_Datatype byte_dt = dtlist->datatype(
Byte())->datatype();
133 m_recv_requests.
resize(nb_message);
134 m_recv_requests_done.
resize(nb_message);
135 double begin_prepare_time = MPI_Wtime();
136 for( Integer i=0; i<nb_message; ++i ){
137 Int32 target_rank = vs_buf->targetRank(i);
138 auto ghost_local_buffer = vs_buf->receiveBuffer(i).bytes().smallView();
139 if (!ghost_local_buffer.empty()){
140 MPI_Request mpi_request;
141 mpi_profiling->iRecv(ghost_local_buffer.data(),ghost_local_buffer.size(),
142 byte_dt,target_rank,523,comm,&mpi_request);
143 m_recv_requests[i] = mpi_request;
144 m_recv_requests_done[i] =
false;
150 m_recv_requests[i] = MPI_Request();
151 m_recv_requests_done[i] =
true;
155 vs_buf->copyAllSend();
158 for( Integer i=0; i<nb_message; ++i ){
159 Int32 target_rank = vs_buf->targetRank(i);
160 auto share_local_buffer = vs_buf->sendBuffer(i).bytes().smallView();
161 if (!share_local_buffer.empty()){
162 MPI_Request mpi_request;
163 mpi_profiling->iSend(share_local_buffer.data(),share_local_buffer.size(),
164 byte_dt,target_rank,523,comm,&mpi_request);
165 m_send_requests.
add(mpi_request);
169 double prepare_time = MPI_Wtime() - begin_prepare_time;
170 pm->
stat()->add(
"SyncPrepare",prepare_time,vs_buf->totalSendSize());
177void MpiLegacyVariableSynchronizerDispatcher::
178endSynchronize(IDataSynchronizeBuffer* vs_buf)
180 MpiParallelMng* pm = m_mpi_parallel_mng;
186 UniqueArray<MPI_Request> remaining_request;
187 UniqueArray<Integer> remaining_indexes;
189 UniqueArray<MPI_Status> mpi_status;
190 UniqueArray<int> completed_requests;
192 UniqueArray<MPI_Request> m_remaining_recv_requests;
193 UniqueArray<Integer> m_remaining_recv_request_indexes;
194 double copy_time = 0.0;
195 double wait_time = 0.0;
197 m_remaining_recv_requests.clear();
198 m_remaining_recv_request_indexes.clear();
199 for( Integer i=0; i<m_recv_requests.
size(); ++i ){
200 if (!m_recv_requests_done[i]){
201 m_remaining_recv_requests.add(m_recv_requests[i]);
202 m_remaining_recv_request_indexes.add(i);
205 Integer nb_remaining_request = m_remaining_recv_requests.size();
206 if (nb_remaining_request==0)
208 int nb_completed_request = 0;
209 mpi_status.resize(nb_remaining_request);
210 completed_requests.resize(nb_remaining_request);
212 double begin_time = MPI_Wtime();
215 m_mpi_parallel_mng->adapter()->getMpiProfiling()->waitSome(nb_remaining_request,m_remaining_recv_requests.data(),
216 &nb_completed_request,completed_requests.data(),
219 double end_time = MPI_Wtime();
220 wait_time += (end_time-begin_time);
223 for(
int z=0; z<nb_completed_request; ++z ){
224 int mpi_request_index = completed_requests[z];
225 Integer index = m_remaining_recv_request_indexes[mpi_request_index];
228 double begin_time = MPI_Wtime();
229 vs_buf->copyReceiveAsync(index);
230 double end_time = MPI_Wtime();
231 copy_time += (end_time - begin_time);
234 m_recv_requests_done[index] =
true;
241 m_mpi_parallel_mng->adapter()->getMpiProfiling()->waitAll(m_send_requests.
size(),m_send_requests.
data(),
249 Int64 total_ghost_size = vs_buf->totalReceiveSize();
250 Int64 total_share_size = vs_buf->totalSendSize();
251 Int64 total_size = total_ghost_size + total_share_size;
252 pm->
stat()->add(
"SyncCopy",copy_time,total_ghost_size);
253 pm->
stat()->add(
"SyncWait",wait_time,total_size);
Buffer générique pour la synchronisation de données.
Interface d'une fabrique dispatcher générique.
virtual IStat * stat()=0
Gestionnaire des statistiques.
virtual Parallel::Communicator communicator() const =0
Communicateur MPI associé à ce gestionnaire.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Implémentation optimisée pour MPI de la synchronisation.
Gestionnaire du parallélisme utilisant MPI.
Integer size() const
Nombre d'éléments du vecteur.
const T * data() const
Accès à la racine du tableau hors toute protection.
void resize(Int64 s)
Change le nombre d'éléments du tableau à s.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
void clear()
Supprime les éléments du tableau.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
unsigned char Byte
Type d'un octet.
Int32 Integer
Type représentant un entier.