Arcane  v3.16.0.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
MpiLegacyVariableSynchronizeDispatcher.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* MpiVariableSynchronizeDispatcher.cc (C) 2000-2025 */
9/* */
10/* Gestion spécifique MPI des synchronisations des variables. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/utils/MemoryView.h"
15
16#include "arcane/parallel/mpi/MpiParallelMng.h"
17#include "arcane/parallel/mpi/MpiDatatypeList.h"
18#include "arcane/parallel/mpi/MpiDatatype.h"
19#include "arcane/parallel/IStat.h"
20
21#include "arcane/impl/IDataSynchronizeBuffer.h"
22#include "arcane/impl/IDataSynchronizeImplementation.h"
23
24#include "arccore/message_passing_mpi/internal/IMpiProfiling.h"
25#include "arccore/message_passing_mpi/internal/MpiAdapter.h"
26
27/*---------------------------------------------------------------------------*/
28/*---------------------------------------------------------------------------*/
29
30namespace Arcane
31{
32
33/*---------------------------------------------------------------------------*/
34/*---------------------------------------------------------------------------*/
45class MpiLegacyVariableSynchronizerDispatcher
47{
48 public:
49
50 class Factory;
51 explicit MpiLegacyVariableSynchronizerDispatcher(Factory* f);
52
53 protected:
54
55 void compute() override {}
56 void beginSynchronize(IDataSynchronizeBuffer* buf) override;
57 void endSynchronize(IDataSynchronizeBuffer* buf) override;
58
59 private:
60
61 MpiParallelMng* m_mpi_parallel_mng;
62 UniqueArray<MPI_Request> m_send_requests;
63 UniqueArray<MPI_Request> m_recv_requests;
64 UniqueArray<Integer> m_recv_requests_done;
65 UniqueArray<MPI_Datatype> m_share_derived_types;
66 UniqueArray<MPI_Datatype> m_ghost_derived_types;
67};
68
69/*---------------------------------------------------------------------------*/
70/*---------------------------------------------------------------------------*/
71
74{
75 public:
76
77 explicit Factory(MpiParallelMng* mpi_pm)
78 : m_mpi_parallel_mng(mpi_pm)
79 {}
80
81 Ref<IDataSynchronizeImplementation> createInstance() override
82 {
83 auto* x = new MpiLegacyVariableSynchronizerDispatcher(this);
85 }
86
87 public:
88
89 MpiParallelMng* m_mpi_parallel_mng = nullptr;
90};
91
92/*---------------------------------------------------------------------------*/
93/*---------------------------------------------------------------------------*/
94
96arcaneCreateMpiLegacyVariableSynchronizerFactory(MpiParallelMng* mpi_pm)
97{
100}
101
102/*---------------------------------------------------------------------------*/
103/*---------------------------------------------------------------------------*/
104
105MpiLegacyVariableSynchronizerDispatcher::
106MpiLegacyVariableSynchronizerDispatcher(Factory* f)
107: m_mpi_parallel_mng(f->m_mpi_parallel_mng)
108{
109}
110
111/*---------------------------------------------------------------------------*/
112/*---------------------------------------------------------------------------*/
113
114void MpiLegacyVariableSynchronizerDispatcher::
115beginSynchronize(IDataSynchronizeBuffer* vs_buf)
116{
117 Integer nb_message = vs_buf->nbRank();
118
119 m_send_requests.clear();
120
121 MpiParallelMng* pm = m_mpi_parallel_mng;
122 MPI_Comm comm = pm->communicator();
123 MpiDatatypeList* dtlist = pm->datatypes();
124 MP::Mpi::IMpiProfiling* mpi_profiling = m_mpi_parallel_mng->adapter()->getMpiProfiling();
125
126 //ITraceMng* trace = pm->traceMng();
127 //trace->info() << " ** ** MPI BEGIN SYNC n=" << nb_message
128 // << " this=" << (IVariableSynchronizeDispatcher*)this;
129 //trace->flush();
130
131 MPI_Datatype byte_dt = dtlist->datatype(Byte())->datatype();
132
133 //SyncBuffer& sync_buffer = this->m_1d_buffer;
134 // Envoie les messages de réception en mode non bloquant
135 m_recv_requests.resize(nb_message);
136 m_recv_requests_done.resize(nb_message);
137 double begin_prepare_time = MPI_Wtime();
138 for( Integer i=0; i<nb_message; ++i ){
139 Int32 target_rank = vs_buf->targetRank(i);
140 auto ghost_local_buffer = vs_buf->receiveBuffer(i).bytes().smallView();
141 if (!ghost_local_buffer.empty()){
142 MPI_Request mpi_request;
143 mpi_profiling->iRecv(ghost_local_buffer.data(),ghost_local_buffer.size(),
144 byte_dt,target_rank,523,comm,&mpi_request);
145 m_recv_requests[i] = mpi_request;
146 m_recv_requests_done[i] = false;
147 //trace->info() << "POST RECV " << vsi.m_target_rank;
148 }
149 else{
150 // Il n'est pas nécessaire d'envoyer un message vide.
151 // Considère le message comme terminé
152 m_recv_requests[i] = MPI_Request();
153 m_recv_requests_done[i] = true;
154 }
155 }
156
157 vs_buf->copyAllSend();
158
159 // Envoie les messages d'envoi en mode non bloquant.
160 for( Integer i=0; i<nb_message; ++i ){
161 Int32 target_rank = vs_buf->targetRank(i);
162 auto share_local_buffer = vs_buf->sendBuffer(i).bytes().smallView();
163 if (!share_local_buffer.empty()){
164 MPI_Request mpi_request;
165 mpi_profiling->iSend(share_local_buffer.data(),share_local_buffer.size(),
166 byte_dt,target_rank,523,comm,&mpi_request);
167 m_send_requests.add(mpi_request);
168 }
169 }
170 {
171 double prepare_time = MPI_Wtime() - begin_prepare_time;
172 pm->stat()->add("SyncPrepare",prepare_time,vs_buf->totalSendSize());
173 }
174}
175
176/*---------------------------------------------------------------------------*/
177/*---------------------------------------------------------------------------*/
178
179void MpiLegacyVariableSynchronizerDispatcher::
180endSynchronize(IDataSynchronizeBuffer* vs_buf)
181{
182 MpiParallelMng* pm = m_mpi_parallel_mng;
183
184 //ITraceMng* trace = pm->traceMng();
185 //trace->info() << " ** ** MPI END SYNC "
186 // << " this=" << (IVariableSynchronizeDispatcher*)this;
187
188 UniqueArray<MPI_Request> remaining_request;
189 UniqueArray<Integer> remaining_indexes;
190
191 UniqueArray<MPI_Status> mpi_status;
192 UniqueArray<int> completed_requests;
193
194 UniqueArray<MPI_Request> m_remaining_recv_requests;
195 UniqueArray<Integer> m_remaining_recv_request_indexes;
196 double copy_time = 0.0;
197 double wait_time = 0.0;
198 while(1){
199 m_remaining_recv_requests.clear();
200 m_remaining_recv_request_indexes.clear();
201 for( Integer i=0; i<m_recv_requests.size(); ++i ){
202 if (!m_recv_requests_done[i]){
203 m_remaining_recv_requests.add(m_recv_requests[i]);
204 m_remaining_recv_request_indexes.add(i); //m_recv_request_indexes[i]);
205 }
206 }
207 Integer nb_remaining_request = m_remaining_recv_requests.size();
208 if (nb_remaining_request==0)
209 break;
210 int nb_completed_request = 0;
211 mpi_status.resize(nb_remaining_request);
212 completed_requests.resize(nb_remaining_request);
213 {
214 double begin_time = MPI_Wtime();
215 //trace->info() << "Wait some: n=" << nb_remaining_request
216 // << " total=" << nb_message;
217 m_mpi_parallel_mng->adapter()->getMpiProfiling()->waitSome(nb_remaining_request,m_remaining_recv_requests.data(),
218 &nb_completed_request,completed_requests.data(),
219 mpi_status.data());
220 //trace->info() << "Wait some end: nb_done=" << nb_completed_request;
221 double end_time = MPI_Wtime();
222 wait_time += (end_time-begin_time);
223 }
224 // Pour chaque requête terminée, effectue la copie
225 for( int z=0; z<nb_completed_request; ++z ){
226 int mpi_request_index = completed_requests[z];
227 Integer index = m_remaining_recv_request_indexes[mpi_request_index];
228
229 {
230 double begin_time = MPI_Wtime();
231 vs_buf->copyReceiveAsync(index);
232 double end_time = MPI_Wtime();
233 copy_time += (end_time - begin_time);
234 }
235 //trace->info() << "Mark finish index = " << index << " mpi_request_index=" << mpi_request_index;
236 m_recv_requests_done[index] = true; // Pour indiquer que c'est fini
237 }
238 }
239
240 //trace->info() << "Wait all begin: n=" << m_send_requests.size();
241 // Attend que les envois se terminent
242 mpi_status.resize(m_send_requests.size());
243 m_mpi_parallel_mng->adapter()->getMpiProfiling()->waitAll(m_send_requests.size(),m_send_requests.data(),
244 mpi_status.data());
245
246 // S'assure que les copies des buffers sont bien terminées
247 vs_buf->barrier();
248
249 //trace->info() << "Wait all end";
250 {
251 Int64 total_ghost_size = vs_buf->totalReceiveSize();
252 Int64 total_share_size = vs_buf->totalSendSize();
253 Int64 total_size = total_ghost_size + total_share_size;
254 pm->stat()->add("SyncCopy",copy_time,total_ghost_size);
255 pm->stat()->add("SyncWait",wait_time,total_size);
256 }
257}
258
259/*---------------------------------------------------------------------------*/
260/*---------------------------------------------------------------------------*/
261
262} // End namespace Arcane
263
264/*---------------------------------------------------------------------------*/
265/*---------------------------------------------------------------------------*/
void resize(Int64 s)
Change le nombre d'éléments du tableau à s.
Buffer générique pour la synchronisation de données.
Interface d'une fabrique dispatcher générique.
Gestionnaire du parallélisme utilisant MPI.
Communicator communicator() const override
Communicateur MPI associé à ce gestionnaire.
Référence à une instance.
Vecteur 1D de données avec sémantique par valeur (style STL).
void clear()
Supprime les éléments du tableau.
void resize(Int64 s)
Change le nombre d'éléments du tableau à s.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
unsigned char Byte
Type d'un octet.
Definition BaseTypes.h:43
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int32_t Int32
Type entier signé sur 32 bits.