Arcane  v3.15.0.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
MpiLegacyVariableSynchronizeDispatcher.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2023 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* MpiVariableSynchronizeDispatcher.cc (C) 2000-2023 */
9/* */
10/* Gestion spécifique MPI des synchronisations des variables. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/utils/MemoryView.h"
15
16#include "arcane/parallel/mpi/MpiParallelMng.h"
17#include "arcane/parallel/mpi/MpiAdapter.h"
18#include "arcane/parallel/mpi/MpiDatatypeList.h"
19#include "arcane/parallel/mpi/MpiDatatype.h"
20#include "arcane/parallel/IStat.h"
21
22#include "arcane/impl/IDataSynchronizeBuffer.h"
23#include "arcane/impl/IDataSynchronizeImplementation.h"
24
25/*---------------------------------------------------------------------------*/
26/*---------------------------------------------------------------------------*/
27
28namespace Arcane
29{
30
31/*---------------------------------------------------------------------------*/
32/*---------------------------------------------------------------------------*/
45{
46 public:
47
48 class Factory;
50
51 protected:
52
53 void compute() override {}
54 void beginSynchronize(IDataSynchronizeBuffer* buf) override;
55 void endSynchronize(IDataSynchronizeBuffer* buf) override;
56
57 private:
58
59 MpiParallelMng* m_mpi_parallel_mng;
60 UniqueArray<MPI_Request> m_send_requests;
61 UniqueArray<MPI_Request> m_recv_requests;
62 UniqueArray<Integer> m_recv_requests_done;
63 UniqueArray<MPI_Datatype> m_share_derived_types;
64 UniqueArray<MPI_Datatype> m_ghost_derived_types;
65};
66
67/*---------------------------------------------------------------------------*/
68/*---------------------------------------------------------------------------*/
69
72{
73 public:
74
76 : m_mpi_parallel_mng(mpi_pm)
77 {}
78
79 Ref<IDataSynchronizeImplementation> createInstance() override
80 {
83 }
84
85 public:
86
87 MpiParallelMng* m_mpi_parallel_mng = nullptr;
88};
89
90/*---------------------------------------------------------------------------*/
91/*---------------------------------------------------------------------------*/
92
94arcaneCreateMpiLegacyVariableSynchronizerFactory(MpiParallelMng* mpi_pm)
95{
98}
99
100/*---------------------------------------------------------------------------*/
101/*---------------------------------------------------------------------------*/
102
103MpiLegacyVariableSynchronizerDispatcher::
104MpiLegacyVariableSynchronizerDispatcher(Factory* f)
105: m_mpi_parallel_mng(f->m_mpi_parallel_mng)
106{
107}
108
109/*---------------------------------------------------------------------------*/
110/*---------------------------------------------------------------------------*/
111
112void MpiLegacyVariableSynchronizerDispatcher::
113beginSynchronize(IDataSynchronizeBuffer* vs_buf)
114{
115 Integer nb_message = vs_buf->nbRank();
116
117 m_send_requests.clear();
118
119 MpiParallelMng* pm = m_mpi_parallel_mng;
120 MPI_Comm comm = pm->communicator();
121 MpiDatatypeList* dtlist = pm->datatypes();
122 MP::Mpi::IMpiProfiling* mpi_profiling = m_mpi_parallel_mng->adapter()->getMpiProfiling();
123
124 //ITraceMng* trace = pm->traceMng();
125 //trace->info() << " ** ** MPI BEGIN SYNC n=" << nb_message
126 // << " this=" << (IVariableSynchronizeDispatcher*)this;
127 //trace->flush();
128
129 MPI_Datatype byte_dt = dtlist->datatype(Byte())->datatype();
130
131 //SyncBuffer& sync_buffer = this->m_1d_buffer;
132 // Envoie les messages de réception en mode non bloquant
133 m_recv_requests.resize(nb_message);
134 m_recv_requests_done.resize(nb_message);
135 double begin_prepare_time = MPI_Wtime();
136 for( Integer i=0; i<nb_message; ++i ){
137 Int32 target_rank = vs_buf->targetRank(i);
138 auto ghost_local_buffer = vs_buf->receiveBuffer(i).bytes().smallView();
139 if (!ghost_local_buffer.empty()){
140 MPI_Request mpi_request;
141 mpi_profiling->iRecv(ghost_local_buffer.data(),ghost_local_buffer.size(),
142 byte_dt,target_rank,523,comm,&mpi_request);
143 m_recv_requests[i] = mpi_request;
144 m_recv_requests_done[i] = false;
145 //trace->info() << "POST RECV " << vsi.m_target_rank;
146 }
147 else{
148 // Il n'est pas nécessaire d'envoyer un message vide.
149 // Considère le message comme terminé
150 m_recv_requests[i] = MPI_Request();
151 m_recv_requests_done[i] = true;
152 }
153 }
154
155 vs_buf->copyAllSend();
156
157 // Envoie les messages d'envoi en mode non bloquant.
158 for( Integer i=0; i<nb_message; ++i ){
159 Int32 target_rank = vs_buf->targetRank(i);
160 auto share_local_buffer = vs_buf->sendBuffer(i).bytes().smallView();
161 if (!share_local_buffer.empty()){
162 MPI_Request mpi_request;
163 mpi_profiling->iSend(share_local_buffer.data(),share_local_buffer.size(),
164 byte_dt,target_rank,523,comm,&mpi_request);
165 m_send_requests.add(mpi_request);
166 }
167 }
168 {
169 double prepare_time = MPI_Wtime() - begin_prepare_time;
170 pm->stat()->add("SyncPrepare",prepare_time,vs_buf->totalSendSize());
171 }
172}
173
174/*---------------------------------------------------------------------------*/
175/*---------------------------------------------------------------------------*/
176
177void MpiLegacyVariableSynchronizerDispatcher::
178endSynchronize(IDataSynchronizeBuffer* vs_buf)
179{
180 MpiParallelMng* pm = m_mpi_parallel_mng;
181
182 //ITraceMng* trace = pm->traceMng();
183 //trace->info() << " ** ** MPI END SYNC "
184 // << " this=" << (IVariableSynchronizeDispatcher*)this;
185
186 UniqueArray<MPI_Request> remaining_request;
187 UniqueArray<Integer> remaining_indexes;
188
189 UniqueArray<MPI_Status> mpi_status;
190 UniqueArray<int> completed_requests;
191
192 UniqueArray<MPI_Request> m_remaining_recv_requests;
193 UniqueArray<Integer> m_remaining_recv_request_indexes;
194 double copy_time = 0.0;
195 double wait_time = 0.0;
196 while(1){
197 m_remaining_recv_requests.clear();
198 m_remaining_recv_request_indexes.clear();
199 for( Integer i=0; i<m_recv_requests.size(); ++i ){
200 if (!m_recv_requests_done[i]){
201 m_remaining_recv_requests.add(m_recv_requests[i]);
202 m_remaining_recv_request_indexes.add(i); //m_recv_request_indexes[i]);
203 }
204 }
205 Integer nb_remaining_request = m_remaining_recv_requests.size();
206 if (nb_remaining_request==0)
207 break;
208 int nb_completed_request = 0;
209 mpi_status.resize(nb_remaining_request);
210 completed_requests.resize(nb_remaining_request);
211 {
212 double begin_time = MPI_Wtime();
213 //trace->info() << "Wait some: n=" << nb_remaining_request
214 // << " total=" << nb_message;
215 m_mpi_parallel_mng->adapter()->getMpiProfiling()->waitSome(nb_remaining_request,m_remaining_recv_requests.data(),
216 &nb_completed_request,completed_requests.data(),
217 mpi_status.data());
218 //trace->info() << "Wait some end: nb_done=" << nb_completed_request;
219 double end_time = MPI_Wtime();
220 wait_time += (end_time-begin_time);
221 }
222 // Pour chaque requête terminée, effectue la copie
223 for( int z=0; z<nb_completed_request; ++z ){
224 int mpi_request_index = completed_requests[z];
225 Integer index = m_remaining_recv_request_indexes[mpi_request_index];
226
227 {
228 double begin_time = MPI_Wtime();
229 vs_buf->copyReceiveAsync(index);
230 double end_time = MPI_Wtime();
231 copy_time += (end_time - begin_time);
232 }
233 //trace->info() << "Mark finish index = " << index << " mpi_request_index=" << mpi_request_index;
234 m_recv_requests_done[index] = true; // Pour indiquer que c'est fini
235 }
236 }
237
238 //trace->info() << "Wait all begin: n=" << m_send_requests.size();
239 // Attend que les envois se terminent
240 mpi_status.resize(m_send_requests.size());
241 m_mpi_parallel_mng->adapter()->getMpiProfiling()->waitAll(m_send_requests.size(),m_send_requests.data(),
242 mpi_status.data());
243
244 // S'assure que les copies des buffers sont bien terminées
245 vs_buf->barrier();
246
247 //trace->info() << "Wait all end";
248 {
249 Int64 total_ghost_size = vs_buf->totalReceiveSize();
250 Int64 total_share_size = vs_buf->totalSendSize();
251 Int64 total_size = total_ghost_size + total_share_size;
252 pm->stat()->add("SyncCopy",copy_time,total_ghost_size);
253 pm->stat()->add("SyncWait",wait_time,total_size);
254 }
255}
256
257/*---------------------------------------------------------------------------*/
258/*---------------------------------------------------------------------------*/
259
260} // End namespace Arcane
261
262/*---------------------------------------------------------------------------*/
263/*---------------------------------------------------------------------------*/
Buffer générique pour la synchronisation de données.
Interface d'une fabrique dispatcher générique.
virtual IStat * stat()=0
Gestionnaire des statistiques.
virtual Parallel::Communicator communicator() const =0
Communicateur MPI associé à ce gestionnaire.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Definition Lima.cc:149
Implémentation optimisée pour MPI de la synchronisation.
Gestionnaire du parallélisme utilisant MPI.
Integer size() const
Nombre d'éléments du vecteur.
const T * data() const
Accès à la racine du tableau hors toute protection.
void resize(Int64 s)
Change le nombre d'éléments du tableau à s.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
void clear()
Supprime les éléments du tableau.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
unsigned char Byte
Type d'un octet.
Definition UtilsTypes.h:148
Int32 Integer
Type représentant un entier.