Arcane  v3.14.10.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
DataSynchronizeDispatcher.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2023 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* DataSynchronizeDispatcher.cc (C) 2000-2023 */
9/* */
10/* Gestion de la synchronisation d'une instance de 'IData'. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/impl/internal/IDataSynchronizeDispatcher.h"
15
16#include "arcane/utils/FatalErrorException.h"
17#include "arcane/utils/PlatformUtils.h"
18#include "arcane/utils/MemoryView.h"
19#include "arcane/utils/ValueConvert.h"
20#include "arcane/utils/ITraceMng.h"
21#include "arcane/utils/internal/MemoryBuffer.h"
22
23#include "arcane/core/ParallelMngUtils.h"
24#include "arcane/core/IParallelExchanger.h"
25#include "arcane/core/ISerializeMessage.h"
26#include "arcane/core/ISerializer.h"
27#include "arcane/core/IParallelMng.h"
28#include "arcane/core/IData.h"
29#include "arcane/core/internal/IDataInternal.h"
30
31#include "arcane/impl/DataSynchronizeInfo.h"
32#include "arcane/impl/internal/DataSynchronizeBuffer.h"
33#include "arcane/impl/internal/IBufferCopier.h"
34
35/*---------------------------------------------------------------------------*/
36/*---------------------------------------------------------------------------*/
37
38namespace Arcane
39{
40
41namespace
42{
43 ArrayView<Byte>
44 _toLegacySmallView(MutableMemoryView memory_view)
45 {
46 Span<std::byte> bytes = memory_view.bytes();
47 void* data = bytes.data();
48 Int32 size = bytes.smallView().size();
49 return { size, reinterpret_cast<Byte*>(data) };
50 }
51} // namespace
52
53/*---------------------------------------------------------------------------*/
54/*---------------------------------------------------------------------------*/
55
56/*---------------------------------------------------------------------------*/
57/*---------------------------------------------------------------------------*/
58
60{
61 public:
62
65
66 protected:
67
68 IParallelMng* m_parallel_mng = nullptr;
69 Runner* m_runner = nullptr;
70 Ref<DataSynchronizeInfo> m_sync_info;
71 Ref<IDataSynchronizeImplementation> m_synchronize_implementation;
72
73 protected:
74
75 void _compute();
76};
77
78/*---------------------------------------------------------------------------*/
79/*---------------------------------------------------------------------------*/
80
81DataSynchronizeDispatcherBase::
82DataSynchronizeDispatcherBase(const DataSynchronizeDispatcherBuildInfo& bi)
83: m_parallel_mng(bi.parallelMng())
84, m_sync_info(bi.synchronizeInfo())
85, m_synchronize_implementation(bi.synchronizeImplementation())
86{
87}
88
89/*---------------------------------------------------------------------------*/
90/*---------------------------------------------------------------------------*/
91
92DataSynchronizeDispatcherBase::
93~DataSynchronizeDispatcherBase()
94{
95}
96
97/*---------------------------------------------------------------------------*/
98/*---------------------------------------------------------------------------*/
104_compute()
105{
106 m_synchronize_implementation->compute();
107}
108
109/*---------------------------------------------------------------------------*/
110/*---------------------------------------------------------------------------*/
111
112/*---------------------------------------------------------------------------*/
113/*---------------------------------------------------------------------------*/
117class ARCANE_IMPL_EXPORT DataSynchronizeDispatcher
118: private ReferenceCounterImpl
121{
123
124 public:
125
128 , m_sync_buffer(m_sync_info.get(), bi.bufferCopier())
129 {
130 }
131
132 public:
133
134 void compute() override { _compute(); }
135 void setSynchronizeBuffer(Ref<MemoryBuffer> buffer) override { m_sync_buffer.setSynchronizeBuffer(buffer); }
136 void beginSynchronize(INumericDataInternal* data, bool is_compare_sync) override;
137 DataSynchronizeResult endSynchronize() override;
138
139 private:
140
143 bool m_is_in_sync = false;
144 bool m_is_empty_sync = false;
145};
146
147/*---------------------------------------------------------------------------*/
148/*---------------------------------------------------------------------------*/
149
150/*---------------------------------------------------------------------------*/
151/*---------------------------------------------------------------------------*/
152
155{
157
159 Int32 full_datatype_size = mem_view.datatypeSize();
160
161 if (m_is_in_sync)
162 ARCANE_FATAL("_beginSynchronize() has already been called");
163 m_is_in_sync = true;
164
165 m_is_empty_sync = (mem_view.bytes().size() == 0);
166 if (m_is_empty_sync)
167 return;
168 m_sync_buffer.setDataView(mem_view);
170 m_synchronize_implementation->beginSynchronize(&m_sync_buffer);
171}
172
173/*---------------------------------------------------------------------------*/
174/*---------------------------------------------------------------------------*/
175
178{
179 if (!m_is_in_sync)
180 ARCANE_FATAL("No pending synchronize(). You need to call beginSynchronize() before");
182 if (!m_is_empty_sync) {
183 m_synchronize_implementation->endSynchronize(&m_sync_buffer);
185 }
186 m_is_in_sync = false;
187 return result;
188}
189
190/*---------------------------------------------------------------------------*/
191/*---------------------------------------------------------------------------*/
192
193/*---------------------------------------------------------------------------*/
194/*---------------------------------------------------------------------------*/
195
196Ref<IDataSynchronizeDispatcher> IDataSynchronizeDispatcher::
198{
200}
201
202/*---------------------------------------------------------------------------*/
203/*---------------------------------------------------------------------------*/
204
205/*---------------------------------------------------------------------------*/
206/*---------------------------------------------------------------------------*/
210class ARCANE_IMPL_EXPORT DataSynchronizeMultiDispatcher
212{
213 public:
214
216 : m_parallel_mng(bi.parallelMng())
217 , m_sync_info(bi.synchronizeInfo())
218 {
219 }
220
221 void compute() override {}
223 void synchronize(ConstArrayView<IVariable*> vars) override;
224
225 private:
226
227 IParallelMng* m_parallel_mng = nullptr;
228 Ref<DataSynchronizeInfo> m_sync_info;
229};
230
231/*---------------------------------------------------------------------------*/
232/*---------------------------------------------------------------------------*/
233
234void DataSynchronizeMultiDispatcher::
235synchronize(ConstArrayView<IVariable*> vars)
236{
237 Ref<IParallelExchanger> exchanger{ ParallelMngUtils::createExchangerRef(m_parallel_mng) };
238 Integer nb_rank = m_sync_info->size();
239 Int32UniqueArray recv_ranks(nb_rank);
240 for (Integer i = 0; i < nb_rank; ++i) {
241 Int32 rank = m_sync_info->targetRank(i);
242 exchanger->addSender(rank);
243 recv_ranks[i] = rank;
244 }
245 exchanger->initializeCommunicationsMessages(recv_ranks);
246 for (Integer i = 0; i < nb_rank; ++i) {
247 ISerializeMessage* msg = exchanger->messageToSend(i);
248 ISerializer* sbuf = msg->serializer();
249 Int32ConstArrayView share_ids = m_sync_info->sendInfo().localIds(i);
250 sbuf->setMode(ISerializer::ModeReserve);
251 for (IVariable* var : vars) {
252 var->serialize(sbuf, share_ids, nullptr);
253 }
254 sbuf->allocateBuffer();
255 sbuf->setMode(ISerializer::ModePut);
256 for (IVariable* var : vars) {
257 var->serialize(sbuf, share_ids, nullptr);
258 }
259 }
260 exchanger->processExchange();
261 for (Integer i = 0; i < nb_rank; ++i) {
262 ISerializeMessage* msg = exchanger->messageToReceive(i);
263 ISerializer* sbuf = msg->serializer();
264 Int32ConstArrayView ghost_ids = m_sync_info->receiveInfo().localIds(i);
265 sbuf->setMode(ISerializer::ModeGet);
266 for (IVariable* var : vars) {
267 var->serialize(sbuf, ghost_ids, nullptr);
268 }
269 }
270}
271
272/*---------------------------------------------------------------------------*/
273/*---------------------------------------------------------------------------*/
274
275/*---------------------------------------------------------------------------*/
276/*---------------------------------------------------------------------------*/
283class ARCANE_IMPL_EXPORT DataSynchronizeMultiDispatcherV2
286{
287 public:
288
291 , m_sync_buffer(bi.parallelMng()->traceMng(), m_sync_info.get(), bi.bufferCopier())
292 {
293 }
294
295 void compute() override { _compute(); }
296 void setSynchronizeBuffer(Ref<MemoryBuffer> buffer) override { m_sync_buffer.setSynchronizeBuffer(buffer); }
297 void synchronize(ConstArrayView<IVariable*> vars) override;
298
299 private:
300
301 MultiDataSynchronizeBuffer m_sync_buffer;
302};
303
304/*---------------------------------------------------------------------------*/
305/*---------------------------------------------------------------------------*/
306
307void DataSynchronizeMultiDispatcherV2::
308synchronize(ConstArrayView<IVariable*> vars)
309{
310 const Int32 nb_var = vars.size();
311 m_sync_buffer.setNbData(nb_var);
312
313 // Récupère les emplacements mémoire des données des variables et leur taille
314 Int32 all_datatype_size = 0;
315 {
316 Int32 index = 0;
317 for (IVariable* var : vars) {
318 INumericDataInternal* numapi = var->data()->_commonInternal()->numericData();
319 if (!numapi)
320 ARCANE_FATAL("Variable '{0}' can not be synchronized because it is not a numeric data", var->name());
321 MutableMemoryView mem_view = numapi->memoryView();
322 all_datatype_size += mem_view.datatypeSize();
323 m_sync_buffer.setDataView(index, mem_view);
324 ++index;
325 }
326 }
327
328 // TODO: à passer en paramètre de la fonction
329 bool is_compare_sync = false;
330 m_sync_buffer.prepareSynchronize(all_datatype_size, is_compare_sync);
331
332 m_synchronize_implementation->beginSynchronize(&m_sync_buffer);
333 m_synchronize_implementation->endSynchronize(&m_sync_buffer);
334}
335
336/*---------------------------------------------------------------------------*/
337/*---------------------------------------------------------------------------*/
338
339/*---------------------------------------------------------------------------*/
340/*---------------------------------------------------------------------------*/
348{
349 public:
350
351 class Factory;
353
354 protected:
355
356 void compute() override {}
357 void beginSynchronize(IDataSynchronizeBuffer* buf) override;
358 void endSynchronize(IDataSynchronizeBuffer* buf) override;
359
360 private:
361
362 IParallelMng* m_parallel_mng = nullptr;
363 UniqueArray<Parallel::Request> m_all_requests;
364};
365
366/*---------------------------------------------------------------------------*/
367/*---------------------------------------------------------------------------*/
368
371{
372 public:
373
374 explicit Factory(IParallelMng* pm)
375 : m_parallel_mng(pm)
376 {}
377
378 Ref<IDataSynchronizeImplementation> createInstance() override
379 {
380 auto* x = new SimpleDataSynchronizeImplementation(this);
382 }
383
384 public:
385
386 IParallelMng* m_parallel_mng = nullptr;
387};
388
389/*---------------------------------------------------------------------------*/
390/*---------------------------------------------------------------------------*/
391
392SimpleDataSynchronizeImplementation::
393SimpleDataSynchronizeImplementation(Factory* f)
394: m_parallel_mng(f->m_parallel_mng)
395{
396}
397
398/*---------------------------------------------------------------------------*/
399/*---------------------------------------------------------------------------*/
400
402arcaneCreateSimpleVariableSynchronizerFactory(IParallelMng* pm)
403{
406}
407
408/*---------------------------------------------------------------------------*/
409/*---------------------------------------------------------------------------*/
410
411void SimpleDataSynchronizeImplementation::
412beginSynchronize(IDataSynchronizeBuffer* vs_buf)
413{
414 ARCANE_CHECK_POINTER(vs_buf);
415 IParallelMng* pm = m_parallel_mng;
416
417 const bool use_blocking_send = false;
418 Int32 nb_message = vs_buf->nbRank();
419
420 /*pm->traceMng()->info() << " ** ** COMMON BEGIN SYNC n=" << nb_message
421 << " this=" << (IVariableSynchronizeDispatcher*)this
422 << " m_sync_info=" << &this->m_sync_info;*/
423
424 // Envoie les messages de réception non bloquant
425 for (Integer i = 0; i < nb_message; ++i) {
426 Int32 target_rank = vs_buf->targetRank(i);
427 auto buf = _toLegacySmallView(vs_buf->receiveBuffer(i));
428 if (!buf.empty()) {
429 Parallel::Request rval = pm->recv(buf, target_rank, false);
430 m_all_requests.add(rval);
431 }
432 }
433
434 vs_buf->copyAllSend();
435
436 // Envoie les messages d'envoi en mode non bloquant.
437 for (Integer i = 0; i < nb_message; ++i) {
438 Int32 target_rank = vs_buf->targetRank(i);
439 auto buf = _toLegacySmallView(vs_buf->sendBuffer(i));
440
441 //ConstArrayView<SimpleType> const_share = share_local_buffer;
442 if (!buf.empty()) {
443 //for( Integer i=0, is=share_local_buffer.size(); i<is; ++i )
444 //trace->info() << "TO rank=" << vsi.m_target_rank << " I=" << i << " V=" << share_local_buffer[i]
445 // << " lid=" << share_grp[i] << " v2=" << var_values[share_grp[i]];
446 Parallel::Request rval = pm->send(buf, target_rank, use_blocking_send);
447 if (!use_blocking_send)
448 m_all_requests.add(rval);
449 }
450 }
451}
452
453/*---------------------------------------------------------------------------*/
454/*---------------------------------------------------------------------------*/
455
456void SimpleDataSynchronizeImplementation::
457endSynchronize(IDataSynchronizeBuffer* vs_buf)
458{
459 IParallelMng* pm = m_parallel_mng;
460
461 /*pm->traceMng()->info() << " ** ** COMMON END SYNC n=" << nb_message
462 << " this=" << (IVariableSynchronizeDispatcher*)this
463 << " m_sync_info=" << &this->m_sync_info;*/
464
465 // Attend que les réceptions se terminent
466 pm->waitAllRequests(m_all_requests);
467 m_all_requests.clear();
468
469 // Recopie dans la variable le message de retour.
470 vs_buf->copyAllReceive();
471}
472
473/*---------------------------------------------------------------------------*/
474/*---------------------------------------------------------------------------*/
475
476/*---------------------------------------------------------------------------*/
477/*---------------------------------------------------------------------------*/
478
479IDataSynchronizeMultiDispatcher* IDataSynchronizeMultiDispatcher::
480create(const DataSynchronizeDispatcherBuildInfo& bi)
481{
482 // TODO: Une fois qu'on aura supprimer l'ancien mécanisme, il faudra
483 // modifier l'API ne pas utiliser 'VariableCollection' mais une liste
484 // de \a INumericDataInternal
485 if (auto v = Convert::Type<Int32>::tryParseFromEnvironment("ARCANE_USE_LEGACY_MULTISYNCHRONIZE", true))
486 if (v.value() >= 1)
487 return new DataSynchronizeMultiDispatcher(bi);
488 return new DataSynchronizeMultiDispatcherV2(bi);
489}
490
491/*---------------------------------------------------------------------------*/
492/*---------------------------------------------------------------------------*/
493
494} // namespace Arcane
495
496/*---------------------------------------------------------------------------*/
497/*---------------------------------------------------------------------------*/
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
#define ARCCORE_DEFINE_REFERENCE_COUNTED_INCLASS_METHODS()
Macro pour définir les méthodes gérant les compteurs de référence.
Gestionnaire d'exécution pour accélérateur.
Definition core/Runner.h:53
void _compute()
Notifie l'implémentation que les informations de synchronisation ont changé.
Infos pour construire un DataSynchronizeDispatcher.
Gestion de la synchronisation pour une donnée.
DataSynchronizeResult endSynchronize() override
Termine la synchronisation.
void setSynchronizeBuffer(Ref< MemoryBuffer > buffer) override
Positionne le buffer de synchronisation.
void compute() override
Recalcule les informations nécessaires après une mise à jour des informations de DataSynchronizeInfo.
SingleDataSynchronizeBuffer m_sync_buffer
Gère les buffers d'envoi et réception pour la synchronisation.
void beginSynchronize(INumericDataInternal *data, bool is_compare_sync) override
Commence l'exécution pour la synchronisation pour la donnée data.
Synchronisation d'une liste de variables.
void compute() override
Recalcule les informations nécessaires après une mise à jour des informations de DataSynchronizeInfo.
void setSynchronizeBuffer(Ref< MemoryBuffer > buffer) override
Positionne le buffer de synchronisation.
Synchronisation d'une liste de variables.
void compute() override
Recalcule les informations nécessaires après une mise à jour des informations de DataSynchronizeInfo.
void setSynchronizeBuffer(Ref< MemoryBuffer >) override
Positionne le buffer de synchronisation.
Informations sur le résultat d'une synchronisation.
Buffer générique pour la synchronisation de données.
Interface pour gérer la synchronisation d'une donnée.
Interface d'une fabrique dispatcher générique.
Interface de la synchronisation d'une liste de variables.
Interface pour un 'IData' d'un type numérique.
virtual MutableMemoryView memoryView()=0
Vue mémoire sur la donnée.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual void recv(ArrayView< char > values, Int32 rank)=0
virtual void waitAllRequests(ArrayView< Request > rvalues)=0
Bloque en attendant que les requêtes rvalues soient terminées.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Definition Lima.cc:120
Implémentation de IDataSynchronizeBuffer pour plusieurs données.
void prepareSynchronize(Int32 datatype_size, bool is_compare_sync) override
Prépare la synchronisation.
Implémentation basique de la sérialisation.
Implémentation de IDataSynchronizeBuffer pour une donnée.
DataSynchronizeResult finalizeSynchronize()
Termine la synchronisation.
void prepareSynchronize(Int32 datatype_size, bool is_compare_sync) override
Prépare la synchronisation.
Implémentation thread-safe d'un compteur de référence.
ITraceMng * traceMng() const
Gestionnaire de trace.
Ref< IParallelExchanger > createExchangerRef(IParallelMng *pm)
Retourne une interface pour transférer des messages entre rangs.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:640
UniqueArray< Int32 > Int32UniqueArray
Tableau dynamique à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:515
unsigned char Byte
Type d'un octet.
Definition UtilsTypes.h:142
Int32 Integer
Type représentant un entier.