Arcane  v3.16.8.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
DataSynchronizeDispatcher.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* DataSynchronizeDispatcher.cc (C) 2000-2025 */
9/* */
10/* Gestion de la synchronisation d'une instance de 'IData'. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/impl/internal/IDataSynchronizeDispatcher.h"
15
16#include "arcane/utils/FatalErrorException.h"
17#include "arcane/utils/PlatformUtils.h"
18#include "arcane/utils/MemoryView.h"
19#include "arcane/utils/ValueConvert.h"
20#include "arcane/utils/ITraceMng.h"
21#include "arcane/utils/internal/MemoryBuffer.h"
22
23#include "arcane/core/ParallelMngUtils.h"
24#include "arcane/core/IParallelExchanger.h"
25#include "arcane/core/ISerializeMessage.h"
26#include "arcane/core/ISerializer.h"
27#include "arcane/core/IParallelMng.h"
28#include "arcane/core/IData.h"
29#include "arcane/core/internal/IDataInternal.h"
30
31#include "arcane/impl/DataSynchronizeInfo.h"
32#include "arcane/impl/internal/DataSynchronizeBuffer.h"
33
34/*---------------------------------------------------------------------------*/
35/*---------------------------------------------------------------------------*/
36
37namespace Arcane
38{
39
40namespace
41{
43 _toLegacySmallView(MutableMemoryView memory_view)
44 {
45 Span<std::byte> bytes = memory_view.bytes();
46 void* data = bytes.data();
47 Int32 size = bytes.smallView().size();
48 return { size, reinterpret_cast<Byte*>(data) };
49 }
50} // namespace
51
52/*---------------------------------------------------------------------------*/
53/*---------------------------------------------------------------------------*/
54
55/*---------------------------------------------------------------------------*/
56/*---------------------------------------------------------------------------*/
57
58class DataSynchronizeDispatcherBase
59{
60 public:
61
62 explicit DataSynchronizeDispatcherBase(const DataSynchronizeDispatcherBuildInfo& bi);
63 ~DataSynchronizeDispatcherBase();
64
65 protected:
66
67 IParallelMng* m_parallel_mng = nullptr;
68 Runner* m_runner = nullptr;
69 Ref<DataSynchronizeInfo> m_sync_info;
70 Ref<IDataSynchronizeImplementation> m_synchronize_implementation;
71
72 protected:
73
74 void _compute();
75};
76
77/*---------------------------------------------------------------------------*/
78/*---------------------------------------------------------------------------*/
79
80DataSynchronizeDispatcherBase::
81DataSynchronizeDispatcherBase(const DataSynchronizeDispatcherBuildInfo& bi)
82: m_parallel_mng(bi.parallelMng())
83, m_sync_info(bi.synchronizeInfo())
84, m_synchronize_implementation(bi.synchronizeImplementation())
85{
86}
87
88/*---------------------------------------------------------------------------*/
89/*---------------------------------------------------------------------------*/
90
91DataSynchronizeDispatcherBase::
92~DataSynchronizeDispatcherBase()
93{
94}
95
96/*---------------------------------------------------------------------------*/
97/*---------------------------------------------------------------------------*/
103_compute()
104{
105 m_synchronize_implementation->compute();
106}
107
108/*---------------------------------------------------------------------------*/
109/*---------------------------------------------------------------------------*/
110
111/*---------------------------------------------------------------------------*/
112/*---------------------------------------------------------------------------*/
116class ARCANE_IMPL_EXPORT DataSynchronizeDispatcher
117: private ReferenceCounterImpl
118, public DataSynchronizeDispatcherBase
120{
122
123 public:
124
125 explicit DataSynchronizeDispatcher(const DataSynchronizeDispatcherBuildInfo& bi)
126 : DataSynchronizeDispatcherBase(bi)
127 , m_sync_buffer(bi.parallelMng()->traceMng(), m_sync_info.get(), bi.bufferCopier())
128 {
129 }
130
131 public:
132
133 void compute() override { _compute(); }
134 void setSynchronizeBuffer(Ref<MemoryBuffer> buffer) override { m_sync_buffer.setSynchronizeBuffer(buffer); }
135 void beginSynchronize(INumericDataInternal* data, bool is_compare_sync) override;
136 DataSynchronizeResult endSynchronize() override;
137
138 private:
139
142 bool m_is_in_sync = false;
143 bool m_is_empty_sync = false;
144};
145
146/*---------------------------------------------------------------------------*/
147/*---------------------------------------------------------------------------*/
148
149/*---------------------------------------------------------------------------*/
150/*---------------------------------------------------------------------------*/
151
153beginSynchronize(INumericDataInternal* data, bool is_compare_sync)
154{
156
157 MutableMemoryView mem_view = data->memoryView();
158
159 if (m_is_in_sync)
160 ARCANE_FATAL("_beginSynchronize() has already been called");
161 m_is_in_sync = true;
162
163 m_is_empty_sync = (mem_view.bytes().size() == 0);
164 if (m_is_empty_sync)
165 return;
166 m_sync_buffer.setDataView(mem_view);
167 m_sync_buffer.prepareSynchronize(is_compare_sync);
168 m_synchronize_implementation->beginSynchronize(&m_sync_buffer);
169}
170
171/*---------------------------------------------------------------------------*/
172/*---------------------------------------------------------------------------*/
173
176{
177 if (!m_is_in_sync)
178 ARCANE_FATAL("No pending synchronize(). You need to call beginSynchronize() before");
180 if (!m_is_empty_sync) {
181 m_synchronize_implementation->endSynchronize(&m_sync_buffer);
182 result = m_sync_buffer.finalizeSynchronize();
183 }
184 m_is_in_sync = false;
185 return result;
186}
187
188/*---------------------------------------------------------------------------*/
189/*---------------------------------------------------------------------------*/
190
191/*---------------------------------------------------------------------------*/
192/*---------------------------------------------------------------------------*/
193
194Ref<IDataSynchronizeDispatcher> IDataSynchronizeDispatcher::
195create(const DataSynchronizeDispatcherBuildInfo& build_info)
196{
198}
199
200/*---------------------------------------------------------------------------*/
201/*---------------------------------------------------------------------------*/
202
203/*---------------------------------------------------------------------------*/
204/*---------------------------------------------------------------------------*/
208class ARCANE_IMPL_EXPORT DataSynchronizeMultiDispatcher
210{
211 public:
212
213 explicit DataSynchronizeMultiDispatcher(const DataSynchronizeDispatcherBuildInfo& bi)
214 : m_parallel_mng(bi.parallelMng())
215 , m_sync_info(bi.synchronizeInfo())
216 {
217 }
218
219 void compute() override {}
221 void synchronize(ConstArrayView<IVariable*> vars) override;
222
223 private:
224
225 IParallelMng* m_parallel_mng = nullptr;
226 Ref<DataSynchronizeInfo> m_sync_info;
227};
228
229/*---------------------------------------------------------------------------*/
230/*---------------------------------------------------------------------------*/
231
232void DataSynchronizeMultiDispatcher::
233synchronize(ConstArrayView<IVariable*> vars)
234{
235 Ref<IParallelExchanger> exchanger{ ParallelMngUtils::createExchangerRef(m_parallel_mng) };
236 Integer nb_rank = m_sync_info->size();
237 Int32UniqueArray recv_ranks(nb_rank);
238 for (Integer i = 0; i < nb_rank; ++i) {
239 Int32 rank = m_sync_info->targetRank(i);
240 exchanger->addSender(rank);
241 recv_ranks[i] = rank;
242 }
243 exchanger->initializeCommunicationsMessages(recv_ranks);
244 for (Integer i = 0; i < nb_rank; ++i) {
245 ISerializeMessage* msg = exchanger->messageToSend(i);
246 ISerializer* sbuf = msg->serializer();
247 Int32ConstArrayView share_ids = m_sync_info->sendInfo().localIds(i);
248 sbuf->setMode(ISerializer::ModeReserve);
249 for (IVariable* var : vars) {
250 var->serialize(sbuf, share_ids, nullptr);
251 }
252 sbuf->allocateBuffer();
253 sbuf->setMode(ISerializer::ModePut);
254 for (IVariable* var : vars) {
255 var->serialize(sbuf, share_ids, nullptr);
256 }
257 }
258 exchanger->processExchange();
259 for (Integer i = 0; i < nb_rank; ++i) {
260 ISerializeMessage* msg = exchanger->messageToReceive(i);
261 ISerializer* sbuf = msg->serializer();
262 Int32ConstArrayView ghost_ids = m_sync_info->receiveInfo().localIds(i);
263 sbuf->setMode(ISerializer::ModeGet);
264 for (IVariable* var : vars) {
265 var->serialize(sbuf, ghost_ids, nullptr);
266 }
267 }
268}
269
270/*---------------------------------------------------------------------------*/
271/*---------------------------------------------------------------------------*/
272
273/*---------------------------------------------------------------------------*/
274/*---------------------------------------------------------------------------*/
281class ARCANE_IMPL_EXPORT DataSynchronizeMultiDispatcherV2
282: public DataSynchronizeDispatcherBase
284{
285 public:
286
287 explicit DataSynchronizeMultiDispatcherV2(const DataSynchronizeDispatcherBuildInfo& bi)
288 : DataSynchronizeDispatcherBase(bi)
289 , m_sync_buffer(bi.parallelMng()->traceMng(), m_sync_info.get(), bi.bufferCopier())
290 {
291 }
292
293 void compute() override { _compute(); }
294 void setSynchronizeBuffer(Ref<MemoryBuffer> buffer) override { m_sync_buffer.setSynchronizeBuffer(buffer); }
295 void synchronize(ConstArrayView<IVariable*> vars) override;
296
297 private:
298
299 MultiDataSynchronizeBuffer m_sync_buffer;
300};
301
302/*---------------------------------------------------------------------------*/
303/*---------------------------------------------------------------------------*/
304
305void DataSynchronizeMultiDispatcherV2::
306synchronize(ConstArrayView<IVariable*> vars)
307{
308 const Int32 nb_var = vars.size();
309 m_sync_buffer.setNbData(nb_var);
310
311 // Récupère les emplacements mémoire des données des variables et leur taille
312 {
313 Int32 index = 0;
314 for (IVariable* var : vars) {
315 INumericDataInternal* numapi = var->data()->_commonInternal()->numericData();
316 if (!numapi)
317 ARCANE_FATAL("Variable '{0}' can not be synchronized because it is not a numeric data", var->name());
318 MutableMemoryView mem_view = numapi->memoryView();
319 m_sync_buffer.setDataView(index, mem_view);
320 ++index;
321 }
322 }
323
324 // TODO: à passer en paramètre de la fonction
325 bool is_compare_sync = false;
326 m_sync_buffer.prepareSynchronize(is_compare_sync);
327
328 m_synchronize_implementation->beginSynchronize(&m_sync_buffer);
329 m_synchronize_implementation->endSynchronize(&m_sync_buffer);
330}
331
332/*---------------------------------------------------------------------------*/
333/*---------------------------------------------------------------------------*/
334
335/*---------------------------------------------------------------------------*/
336/*---------------------------------------------------------------------------*/
342class SimpleDataSynchronizeImplementation
344{
345 public:
346
347 class Factory;
348 explicit SimpleDataSynchronizeImplementation(Factory* f);
349
350 protected:
351
352 void compute() override {}
353 void beginSynchronize(IDataSynchronizeBuffer* buf) override;
354 void endSynchronize(IDataSynchronizeBuffer* buf) override;
355
356 private:
357
358 IParallelMng* m_parallel_mng = nullptr;
359 UniqueArray<Parallel::Request> m_all_requests;
360};
361
362/*---------------------------------------------------------------------------*/
363/*---------------------------------------------------------------------------*/
364
367{
368 public:
369
370 explicit Factory(IParallelMng* pm)
371 : m_parallel_mng(pm)
372 {}
373
374 Ref<IDataSynchronizeImplementation> createInstance() override
375 {
376 auto* x = new SimpleDataSynchronizeImplementation(this);
378 }
379
380 public:
381
382 IParallelMng* m_parallel_mng = nullptr;
383};
384
385/*---------------------------------------------------------------------------*/
386/*---------------------------------------------------------------------------*/
387
388SimpleDataSynchronizeImplementation::
389SimpleDataSynchronizeImplementation(Factory* f)
390: m_parallel_mng(f->m_parallel_mng)
391{
392}
393
394/*---------------------------------------------------------------------------*/
395/*---------------------------------------------------------------------------*/
396
398arcaneCreateSimpleVariableSynchronizerFactory(IParallelMng* pm)
399{
402}
403
404/*---------------------------------------------------------------------------*/
405/*---------------------------------------------------------------------------*/
406
407void SimpleDataSynchronizeImplementation::
408beginSynchronize(IDataSynchronizeBuffer* vs_buf)
409{
410 ARCANE_CHECK_POINTER(vs_buf);
411 IParallelMng* pm = m_parallel_mng;
412
413 const bool use_blocking_send = false;
414 Int32 nb_message = vs_buf->nbRank();
415
416 /*pm->traceMng()->info() << " ** ** COMMON BEGIN SYNC n=" << nb_message
417 << " this=" << (IVariableSynchronizeDispatcher*)this
418 << " m_sync_info=" << &this->m_sync_info;*/
419
420 // Envoie les messages de réception non bloquant
421 for (Integer i = 0; i < nb_message; ++i) {
422 Int32 target_rank = vs_buf->targetRank(i);
423 auto buf = _toLegacySmallView(vs_buf->receiveBuffer(i));
424 if (!buf.empty()) {
425 Parallel::Request rval = pm->recv(buf, target_rank, false);
426 m_all_requests.add(rval);
427 }
428 }
429
430 vs_buf->copyAllSend();
431
432 // Envoie les messages d'envoi en mode non bloquant.
433 for (Integer i = 0; i < nb_message; ++i) {
434 Int32 target_rank = vs_buf->targetRank(i);
435 auto buf = _toLegacySmallView(vs_buf->sendBuffer(i));
436
437 //ConstArrayView<SimpleType> const_share = share_local_buffer;
438 if (!buf.empty()) {
439 //for( Integer i=0, is=share_local_buffer.size(); i<is; ++i )
440 //trace->info() << "TO rank=" << vsi.m_target_rank << " I=" << i << " V=" << share_local_buffer[i]
441 // << " lid=" << share_grp[i] << " v2=" << var_values[share_grp[i]];
442 Parallel::Request rval = pm->send(buf, target_rank, use_blocking_send);
443 if (!use_blocking_send)
444 m_all_requests.add(rval);
445 }
446 }
447}
448
449/*---------------------------------------------------------------------------*/
450/*---------------------------------------------------------------------------*/
451
452void SimpleDataSynchronizeImplementation::
453endSynchronize(IDataSynchronizeBuffer* vs_buf)
454{
455 IParallelMng* pm = m_parallel_mng;
456
457 /*pm->traceMng()->info() << " ** ** COMMON END SYNC n=" << nb_message
458 << " this=" << (IVariableSynchronizeDispatcher*)this
459 << " m_sync_info=" << &this->m_sync_info;*/
460
461 // Attend que les réceptions se terminent
462 pm->waitAllRequests(m_all_requests);
463 m_all_requests.clear();
464
465 // Recopie dans la variable le message de retour.
466 vs_buf->copyAllReceive();
467}
468
469/*---------------------------------------------------------------------------*/
470/*---------------------------------------------------------------------------*/
471
472/*---------------------------------------------------------------------------*/
473/*---------------------------------------------------------------------------*/
474
475IDataSynchronizeMultiDispatcher* IDataSynchronizeMultiDispatcher::
476create(const DataSynchronizeDispatcherBuildInfo& bi)
477{
478 // TODO: Une fois qu'on aura supprimer l'ancien mécanisme, il faudra
479 // modifier l'API ne pas utiliser 'VariableCollection' mais une liste
480 // de \a INumericDataInternal
481 if (auto v = Convert::Type<Int32>::tryParseFromEnvironment("ARCANE_USE_LEGACY_MULTISYNCHRONIZE", true))
482 if (v.value() >= 1)
483 return new DataSynchronizeMultiDispatcher(bi);
484 return new DataSynchronizeMultiDispatcherV2(bi);
485}
486
487/*---------------------------------------------------------------------------*/
488/*---------------------------------------------------------------------------*/
489
490} // namespace Arcane
491
492/*---------------------------------------------------------------------------*/
493/*---------------------------------------------------------------------------*/
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
#define ARCCORE_DEFINE_REFERENCE_COUNTED_INCLASS_METHODS()
Macro pour définir les méthodes gérant les compteurs de référence.
Gestionnaire d'exécution pour accélérateur.
Definition core/Runner.h:68
Vue modifiable d'un tableau d'un type T.
Vue constante d'un tableau de type T.
static std::optional< Int32 > tryParseFromEnvironment(StringView s, bool throw_if_invalid)
Definition Convert.cc:122
void _compute()
Notifie l'implémentation que les informations de synchronisation ont changé.
Infos pour construire un DataSynchronizeDispatcher.
Gestion de la synchronisation pour une donnée.
DataSynchronizeResult endSynchronize() override
Termine la synchronisation.
void setSynchronizeBuffer(Ref< MemoryBuffer > buffer) override
Positionne le buffer de synchronisation.
void compute() override
Recalcule les informations nécessaires après une mise à jour des informations de DataSynchronizeInfo.
SingleDataSynchronizeBuffer m_sync_buffer
Gère les buffers d'envoi et réception pour la synchronisation.
void beginSynchronize(INumericDataInternal *data, bool is_compare_sync) override
Commence l'exécution pour la synchronisation pour la donnée data.
void compute() override
Recalcule les informations nécessaires après une mise à jour des informations de DataSynchronizeInfo.
void setSynchronizeBuffer(Ref< MemoryBuffer > buffer) override
Positionne le buffer de synchronisation.
void compute() override
Recalcule les informations nécessaires après une mise à jour des informations de DataSynchronizeInfo.
void setSynchronizeBuffer(Ref< MemoryBuffer >) override
Positionne le buffer de synchronisation.
Informations sur le résultat d'une synchronisation.
Buffer générique pour la synchronisation de données.
Interface pour gérer la synchronisation d'une donnée.
Interface d'une fabrique dispatcher générique.
Interface de la synchronisation d'une liste de variables.
Interface pour un 'IData' d'un type numérique.
virtual MutableMemoryView memoryView()=0
Vue mémoire sur la donnée.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual ITraceMng * traceMng() const =0
Gestionnaire de traces.
virtual void waitAllRequests(ArrayView< Request > rvalues)=0
Bloque en attendant que les requêtes rvalues soient terminées.
@ ModePut
Le sérialiseur attend des reserve()
Implémentation de IDataSynchronizeBuffer pour plusieurs données.
Vue modifiable sur une zone mémoire contigue contenant des éléments de taille fixe.
Definition MemoryView.h:156
constexpr SpanType bytes() const
Vue sous forme d'octets.
Definition MemoryView.h:215
Référence à une instance.
Implémentation thread-safe d'un compteur de référence.
Implémentation de IDataSynchronizeBuffer pour une donnée.
constexpr __host__ __device__ pointer data() const noexcept
Pointeur sur le début de la vue.
Definition Span.h:422
Vue d'un tableau d'éléments de type T.
Definition Span.h:513
Vecteur 1D de données avec sémantique par valeur (style STL).
Ref< IParallelExchanger > createExchangerRef(IParallelMng *pm)
Retourne une interface pour transférer des messages entre rangs.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
Int32 Integer
Type représentant un entier.
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:569
UniqueArray< Int32 > Int32UniqueArray
Tableau dynamique à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:428
unsigned char Byte
Type d'un octet.
Definition BaseTypes.h:43
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int32_t Int32
Type entier signé sur 32 bits.