Arcane  v3.14.10.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
MpiVariableSynchronizeDispatcher.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2023 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* MpiVariableSynchronizeDispatcher.cc (C) 2000-2023 */
9/* */
10/* Gestion spécifique MPI des synchronisations des variables. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/utils/FatalErrorException.h"
15#include "arcane/utils/MemoryView.h"
16
17#include "arcane/parallel/mpi/MpiParallelMng.h"
18#include "arcane/parallel/mpi/MpiAdapter.h"
19#include "arcane/parallel/mpi/MpiTimeInterval.h"
20#include "arcane/parallel/IStat.h"
21
22#include "arcane/impl/IDataSynchronizeBuffer.h"
23#include "arcane/impl/IDataSynchronizeImplementation.h"
24
25#include "arccore/message_passing/IRequestList.h"
26
27/*---------------------------------------------------------------------------*/
28/*---------------------------------------------------------------------------*/
29
30/*
31 * Le fonctionnement de l'algorithme de synchronisation est le suivant. Les
32 * trois premiers points sont dans beginSynchronize() et les deux derniers dans
33 * endSynchronize(). Le code actuel ne permet qu'un synchronisation non
34 * bloquante à la fois.
35 *
36 * 1. Poste les messages de réception
37 * 2. Recopie dans les buffers d'envoi les valeurs à envoyer. On le fait après
38 * avoir posté les messages de réception pour faire un peu de recouvrement
39 * entre le calcul et les communications.
40 * 3. Poste les messages d'envoi.
41 * 4. Fait un WaitSome sur les messages de réception. Dès qu'un message arrive,
42 * on recopie le buffer de réception dans le tableau de la variable. On
43 * peut simplifier le code en faisant un WaitAll et en recopiant à la fin
44 * toutes les valeurs.
45 * 5. Fait un WaitAll des messages d'envoi pour libérer les requêtes.
46*/
47/*---------------------------------------------------------------------------*/
48/*---------------------------------------------------------------------------*/
49
50namespace Arcane
51{
52
53/*---------------------------------------------------------------------------*/
54/*---------------------------------------------------------------------------*/
68{
69 public:
70
71 class Factory;
73
74 protected:
75
76 void compute() override {}
77 void beginSynchronize(IDataSynchronizeBuffer* ds_buf) override;
78 void endSynchronize(IDataSynchronizeBuffer* ds_buf) override;
79
80 private:
81
82 MpiParallelMng* m_mpi_parallel_mng;
83 UniqueArray<Parallel::Request> m_original_recv_requests;
84 UniqueArray<bool> m_original_recv_requests_done;
85 Ref<Parallel::IRequestList> m_receive_request_list;
86 Ref<Parallel::IRequestList> m_send_request_list;
87};
88
89/*---------------------------------------------------------------------------*/
90/*---------------------------------------------------------------------------*/
91
94{
95 public:
96
98 : m_mpi_parallel_mng(mpi_pm)
99 {}
100
101 Ref<IDataSynchronizeImplementation> createInstance() override
102 {
103 auto* x = new MpiVariableSynchronizeDispatcher(this);
105 }
106
107 public:
108
109 MpiParallelMng* m_mpi_parallel_mng = nullptr;
110};
111
112/*---------------------------------------------------------------------------*/
113/*---------------------------------------------------------------------------*/
114
116arcaneCreateMpiVariableSynchronizerFactory(MpiParallelMng* mpi_pm)
117{
120}
121
122/*---------------------------------------------------------------------------*/
123/*---------------------------------------------------------------------------*/
124
125MpiVariableSynchronizeDispatcher::
126MpiVariableSynchronizeDispatcher(Factory* f)
127: m_mpi_parallel_mng(f->m_mpi_parallel_mng)
128, m_receive_request_list(m_mpi_parallel_mng->createRequestListRef())
129, m_send_request_list(m_mpi_parallel_mng->createRequestListRef())
130{
131}
132
133/*---------------------------------------------------------------------------*/
134/*---------------------------------------------------------------------------*/
135
136void MpiVariableSynchronizeDispatcher::
137beginSynchronize(IDataSynchronizeBuffer* ds_buf)
138{
139 Integer nb_message = ds_buf->nbRank();
140
141 m_send_request_list->clear();
142
143 MpiParallelMng* pm = m_mpi_parallel_mng;
144
145 MP::Mpi::MpiAdapter* mpi_adapter = pm->adapter();
146 const MPI_Datatype mpi_dt = MP::Mpi::MpiBuiltIn::datatype(Byte());
147
148 double prepare_time = 0.0;
149
150 {
151 MpiTimeInterval tit(&prepare_time);
152 constexpr int serialize_tag = 523;
153
154 // Envoie les messages de réception en mode non bloquant
155 m_original_recv_requests_done.resize(nb_message);
156 m_original_recv_requests.resize(nb_message);
157
158 // Poste les messages de réception
159 for (Integer i = 0; i < nb_message; ++i) {
160 Int32 target_rank = ds_buf->targetRank(i);
161 auto buf = ds_buf->receiveBuffer(i).bytes();
162 if (!buf.empty()) {
163 auto req = mpi_adapter->receiveNonBlockingNoStat(buf.data(), buf.size(),
164 target_rank, mpi_dt, serialize_tag);
165 m_original_recv_requests[i] = req;
166 m_original_recv_requests_done[i] = false;
167 }
168 else {
169 // Il n'est pas nécessaire d'envoyer un message vide.
170 // Considère le message comme terminé
171 m_original_recv_requests[i] = Parallel::Request{};
172 m_original_recv_requests_done[i] = true;
173 }
174 }
175
176 // Recopie les buffers d'envoi dans \a var_values
177 ds_buf->copyAllSend();
178
179 // Poste les messages d'envoi en mode non bloquant.
180 for (Integer i = 0; i < nb_message; ++i) {
181 auto buf = ds_buf->sendBuffer(i).bytes();
182 Int32 target_rank = ds_buf->targetRank(i);
183 if (!buf.empty()) {
184 auto request = mpi_adapter->sendNonBlockingNoStat(buf.data(), buf.size(),
185 target_rank, mpi_dt, serialize_tag);
186 m_send_request_list->add(request);
187 }
188 }
189 }
190 pm->stat()->add("SyncPrepare", prepare_time, ds_buf->totalSendSize());
191}
192
193/*---------------------------------------------------------------------------*/
194/*---------------------------------------------------------------------------*/
195
196void MpiVariableSynchronizeDispatcher::
197endSynchronize(IDataSynchronizeBuffer* ds_buf)
198{
199 MpiParallelMng* pm = m_mpi_parallel_mng;
200
201 // On a besoin de conserver l'indice d'origine dans 'SyncBuffer'
202 // de chaque requête pour gérer les copies.
203 UniqueArray<Integer> remaining_original_indexes;
204
205 double copy_time = 0.0;
206 double wait_time = 0.0;
207
208 while (1) {
209 // Créé la liste des requêtes encore active.
210 m_receive_request_list->clear();
211 remaining_original_indexes.clear();
212 for (Integer i = 0, n = m_original_recv_requests_done.size(); i < n; ++i) {
213 if (!m_original_recv_requests_done[i]) {
214 m_receive_request_list->add(m_original_recv_requests[i]);
215 remaining_original_indexes.add(i);
216 }
217 }
218 Integer nb_remaining_request = m_receive_request_list->size();
219 if (nb_remaining_request == 0)
220 break;
221
222 {
223 MpiTimeInterval tit(&wait_time);
224 m_receive_request_list->wait(Parallel::WaitSome);
225 }
226
227 // Pour chaque requête terminée, effectue la copie
228 ConstArrayView<Int32> done_requests = m_receive_request_list->doneRequestIndexes();
229
230 for (Int32 request_index : done_requests) {
231 Int32 orig_index = remaining_original_indexes[request_index];
232
233 // Pour indiquer que c'est fini
234 m_original_recv_requests_done[orig_index] = true;
235
236 // Recopie les valeurs recues
237 {
238 MpiTimeInterval tit(&copy_time);
239 ds_buf->copyReceiveAsync(orig_index);
240 }
241 }
242 }
243
244 // Attend que les envois se terminent.
245 // Il faut le faire pour pouvoir libérer les requêtes même si le message
246 // est arrivé.
247 {
248 MpiTimeInterval tit(&wait_time);
249 m_send_request_list->wait(Parallel::WaitAll);
250 }
251
252 // S'assure que les copies des buffers sont bien terminées
253 ds_buf->barrier();
254
255 Int64 total_ghost_size = ds_buf->totalReceiveSize();
256 Int64 total_share_size = ds_buf->totalSendSize();
257 Int64 total_size = total_ghost_size + total_share_size;
258 pm->stat()->add("SyncCopy", copy_time, total_ghost_size);
259 pm->stat()->add("SyncWait", wait_time, total_size);
260}
261
262/*---------------------------------------------------------------------------*/
263/*---------------------------------------------------------------------------*/
264
265} // End namespace Arcane
266
267/*---------------------------------------------------------------------------*/
268/*---------------------------------------------------------------------------*/
Buffer générique pour la synchronisation de données.
Interface d'une fabrique dispatcher générique.
virtual IStat * stat()=0
Gestionnaire des statistiques.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Definition Lima.cc:120
Gestionnaire du parallélisme utilisant MPI.
Implémentation optimisée pour MPI de la synchronisation.
Integer size() const
Nombre d'éléments du vecteur.
void resize(Int64 s)
Change le nombre d'éléments du tableau à s.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
unsigned char Byte
Type d'un octet.
Definition UtilsTypes.h:142
Int32 Integer
Type représentant un entier.