Arcane  v3.15.0.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
MpiBlockVariableSynchronizeDispatcher.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2023 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* MpiBlockVariableSynchronizeDispatcher.cc (C) 2000-2023 */
9/* */
10/* Gestion spécifique MPI des synchronisations des variables. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/utils/FatalErrorException.h"
15#include "arcane/utils/MemoryView.h"
16
17#include "arcane/parallel/mpi/MpiParallelMng.h"
18#include "arcane/parallel/mpi/MpiAdapter.h"
19#include "arcane/parallel/mpi/MpiTimeInterval.h"
20#include "arcane/parallel/IStat.h"
21
22#include "arcane/impl/IDataSynchronizeBuffer.h"
23#include "arcane/impl/IDataSynchronizeImplementation.h"
24
25#include "arccore/message_passing/IRequestList.h"
26
27/*---------------------------------------------------------------------------*/
28/*---------------------------------------------------------------------------*/
29/*
30 * Cette implémentation découpe la synchronisation en bloc de taille fixe.
31 * Tout le mécanisme est dans _endSynchronize().
32 * L'algorithme est le suivant:
33 *
34 * 1. Recopie dans les buffers d'envoi les valeurs à envoyer.
35 * 2. Boucle sur Irecv/ISend/WaitAll tant que qu'il y a au moins une partie non vide.
36 * 3. Recopie depuis les buffers de réception les valeurs des variables.
37*/
38/*---------------------------------------------------------------------------*/
39/*---------------------------------------------------------------------------*/
40
41namespace Arcane
42{
43
44/*---------------------------------------------------------------------------*/
45/*---------------------------------------------------------------------------*/
57{
58 public:
59
60 class Factory;
62
63 public:
64
65 void compute() override {}
66 void beginSynchronize(IDataSynchronizeBuffer* buf) override;
67 void endSynchronize(IDataSynchronizeBuffer* buf) override;
68
69 private:
70
71 MpiParallelMng* m_mpi_parallel_mng = nullptr;
72 Ref<Parallel::IRequestList> m_request_list;
73 Int32 m_block_size;
74 Int32 m_nb_sequence;
75
76 private:
77
78 bool _isSkipRank(Int32 rank, Int32 sequence) const;
79};
80
81/*---------------------------------------------------------------------------*/
82/*---------------------------------------------------------------------------*/
83
86{
87 public:
88
89 Factory(MpiParallelMng* mpi_pm, Int32 block_size, Int32 nb_sequence)
90 : m_mpi_parallel_mng(mpi_pm)
91 , m_block_size(block_size)
92 , m_nb_sequence(nb_sequence)
93 {}
94
95 Ref<IDataSynchronizeImplementation> createInstance() override
96 {
99 }
100
101 public:
102
103 MpiParallelMng* m_mpi_parallel_mng = nullptr;
104 Int32 m_block_size = 0;
105 Int32 m_nb_sequence = 0;
106};
107
108/*---------------------------------------------------------------------------*/
109/*---------------------------------------------------------------------------*/
110
112arcaneCreateMpiBlockVariableSynchronizerFactory(MpiParallelMng* mpi_pm, Int32 block_size, Int32 nb_sequence)
113{
116}
117
118/*---------------------------------------------------------------------------*/
119/*---------------------------------------------------------------------------*/
120
121MpiBlockVariableSynchronizerDispatcher::
122MpiBlockVariableSynchronizerDispatcher(Factory* f)
123: m_mpi_parallel_mng(f->m_mpi_parallel_mng)
124, m_request_list(m_mpi_parallel_mng->createRequestListRef())
125, m_block_size(f->m_block_size)
126, m_nb_sequence(f->m_nb_sequence)
127{
128}
129
130/*---------------------------------------------------------------------------*/
131/*---------------------------------------------------------------------------*/
132
133bool MpiBlockVariableSynchronizerDispatcher::
134_isSkipRank(Int32 rank, Int32 sequence) const
135{
136 if (m_nb_sequence == 1)
137 return false;
138 return (rank % m_nb_sequence) == sequence;
139}
140
141/*---------------------------------------------------------------------------*/
142/*---------------------------------------------------------------------------*/
143
144void MpiBlockVariableSynchronizerDispatcher::
145beginSynchronize(IDataSynchronizeBuffer* vs_buf)
146{
147 // Ne fait rien au niveau MPI dans cette partie car cette implémentation
148 // ne supporte pas l'asyncrhonisme.
149 // On se contente de recopier les valeurs des variables dans le buffer d'envoi
150 // pour permettre ensuite de modifier les valeurs de la variable entre
151 // le _beginSynchronize() et le _endSynchronize().
152
153 double send_copy_time = 0.0;
154 {
155 MpiTimeInterval tit(&send_copy_time);
156 // Recopie les buffers d'envoi
157 vs_buf->copyAllSend();
158 }
159 Int64 total_share_size = vs_buf->totalSendSize();
160 m_mpi_parallel_mng->stat()->add("SyncSendCopy", send_copy_time, total_share_size);
161}
162
163/*---------------------------------------------------------------------------*/
164/*---------------------------------------------------------------------------*/
165
166void MpiBlockVariableSynchronizerDispatcher::
167endSynchronize(IDataSynchronizeBuffer* vs_buf)
168{
169 const Int32 nb_message = vs_buf->nbRank();
170
171 MpiParallelMng* pm = m_mpi_parallel_mng;
172 Int32 my_rank = pm->commRank();
173
174 MP::Mpi::MpiAdapter* mpi_adapter = pm->adapter();
175 const MPI_Datatype mpi_dt = MP::Mpi::MpiBuiltIn::datatype(Byte());
176
177 double prepare_time = 0.0;
178 double copy_time = 0.0;
179 double wait_time = 0.0;
180
181 constexpr int serialize_tag = 523;
182
183 const Int32 block_size = m_block_size;
184
185 for (Int32 isequence = 0; isequence < m_nb_sequence; ++isequence) {
186 Int32 block_index = 0;
187 while (1) {
188 {
189 MpiTimeInterval tit(&prepare_time);
190 m_request_list->clear();
191
192 // Poste les messages de réception
193 for (Integer i = 0; i < nb_message; ++i) {
194 Int32 target_rank = vs_buf->targetRank(i);
195 if (_isSkipRank(target_rank, isequence))
196 continue;
197 auto buf0 = vs_buf->receiveBuffer(i).bytes();
198 auto buf = buf0.subSpan(block_index, block_size);
199 if (!buf.empty()) {
200 auto req = mpi_adapter->receiveNonBlockingNoStat(buf.data(), buf.size(),
201 target_rank, mpi_dt, serialize_tag);
202 m_request_list->add(req);
203 }
204 }
205
206 // Poste les messages d'envoi en mode non bloquant.
207 for (Integer i = 0; i < nb_message; ++i) {
208 Int32 target_rank = vs_buf->targetRank(i);
209 if (_isSkipRank(my_rank, isequence))
210 continue;
211 auto buf0 = vs_buf->sendBuffer(i).bytes();
212 auto buf = buf0.subSpan(block_index, block_size);
213 if (!buf.empty()) {
214 auto request = mpi_adapter->sendNonBlockingNoStat(buf.data(), buf.size(),
215 target_rank, mpi_dt, serialize_tag);
216 m_request_list->add(request);
217 }
218 }
219 }
220
221 // Si aucune requête alors on a fini notre synchronisation
222 if (m_request_list->size() == 0)
223 break;
224
225 // Attend que les messages soient terminés
226 {
227 MpiTimeInterval tit(&wait_time);
228 m_request_list->wait(Parallel::WaitAll);
229 }
230
231 block_index += block_size;
232 }
233 }
234
235 // Recopie les valeurs recues
236 {
237 MpiTimeInterval tit(&copy_time);
238 vs_buf->copyAllReceive();
239 }
240
241 Int64 total_ghost_size = vs_buf->totalReceiveSize();
242 Int64 total_share_size = vs_buf->totalSendSize();
243 Int64 total_size = total_ghost_size + total_share_size;
244 pm->stat()->add("SyncCopy", copy_time, total_ghost_size);
245 pm->stat()->add("SyncWait", wait_time, total_size);
246 pm->stat()->add("SyncPrepare", prepare_time, total_share_size);
247}
248
249/*---------------------------------------------------------------------------*/
250/*---------------------------------------------------------------------------*/
251
252} // End namespace Arcane
253
254/*---------------------------------------------------------------------------*/
255/*---------------------------------------------------------------------------*/
Buffer générique pour la synchronisation de données.
Interface d'une fabrique dispatcher générique.
virtual Int32 commRank() const =0
Rang de cette instance dans le communicateur.
virtual IStat * stat()=0
Gestionnaire des statistiques.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Definition Lima.cc:149
Implémentation par block pour MPI de la synchronisation.
Gestionnaire du parallélisme utilisant MPI.
Parallel::IStat * stat() override
Gestionnaire des statistiques.
virtual void add(const String &name, double elapsed_time, Int64 msg_size)=0
Ajoute une statistique.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
unsigned char Byte
Type d'un octet.
Definition UtilsTypes.h:148