14#include "arcane/impl/internal/IDataSynchronizeDispatcher.h"
16#include "arcane/utils/FatalErrorException.h"
17#include "arcane/utils/PlatformUtils.h"
18#include "arcane/utils/MemoryView.h"
19#include "arcane/utils/ValueConvert.h"
20#include "arcane/utils/ITraceMng.h"
21#include "arcane/utils/internal/MemoryBuffer.h"
23#include "arcane/core/ParallelMngUtils.h"
24#include "arcane/core/IParallelExchanger.h"
25#include "arcane/core/ISerializeMessage.h"
26#include "arcane/core/ISerializer.h"
27#include "arcane/core/IParallelMng.h"
28#include "arcane/core/IData.h"
29#include "arcane/core/internal/IDataInternal.h"
31#include "arcane/impl/DataSynchronizeInfo.h"
32#include "arcane/impl/internal/DataSynchronizeBuffer.h"
33#include "arcane/impl/internal/IBufferCopier.h"
44 _toLegacySmallView(MutableMemoryView memory_view)
46 Span<std::byte> bytes = memory_view.bytes();
47 void* data = bytes.data();
48 Int32 size = bytes.smallView().size();
49 return { size,
reinterpret_cast<Byte*
>(data) };
69 Runner* m_runner =
nullptr;
81DataSynchronizeDispatcherBase::
83: m_parallel_mng(
bi.parallelMng())
84, m_sync_info(
bi.synchronizeInfo())
85, m_synchronize_implementation(
bi.synchronizeImplementation())
92DataSynchronizeDispatcherBase::
93~DataSynchronizeDispatcherBase()
106 m_synchronize_implementation->compute();
128 , m_sync_buffer(m_sync_info.get(),
bi.bufferCopier())
143 bool m_is_in_sync =
false;
144 bool m_is_empty_sync =
false;
162 ARCANE_FATAL(
"_beginSynchronize() has already been called");
165 m_is_empty_sync = (
mem_view.bytes().size() == 0);
170 m_synchronize_implementation->beginSynchronize(&
m_sync_buffer);
180 ARCANE_FATAL(
"No pending synchronize(). You need to call beginSynchronize() before");
182 if (!m_is_empty_sync) {
183 m_synchronize_implementation->endSynchronize(&
m_sync_buffer);
186 m_is_in_sync =
false;
216 : m_parallel_mng(
bi.parallelMng())
217 , m_sync_info(
bi.synchronizeInfo())
234void DataSynchronizeMultiDispatcher::
235synchronize(ConstArrayView<IVariable*> vars)
238 Integer nb_rank = m_sync_info->size();
240 for (Integer i = 0; i < nb_rank; ++i) {
241 Int32 rank = m_sync_info->targetRank(i);
242 exchanger->addSender(rank);
243 recv_ranks[i] = rank;
245 exchanger->initializeCommunicationsMessages(recv_ranks);
246 for (Integer i = 0; i < nb_rank; ++i) {
247 ISerializeMessage* msg = exchanger->messageToSend(i);
248 ISerializer* sbuf = msg->serializer();
250 sbuf->setMode(ISerializer::ModeReserve);
251 for (IVariable* var : vars) {
252 var->serialize(sbuf, share_ids,
nullptr);
254 sbuf->allocateBuffer();
256 for (IVariable* var : vars) {
257 var->serialize(sbuf, share_ids,
nullptr);
260 exchanger->processExchange();
261 for (Integer i = 0; i < nb_rank; ++i) {
262 ISerializeMessage* msg = exchanger->messageToReceive(i);
263 ISerializer* sbuf = msg->serializer();
266 for (IVariable* var : vars) {
267 var->serialize(sbuf, ghost_ids,
nullptr);
291 , m_sync_buffer(
bi.parallelMng()->
traceMng(), m_sync_info.get(),
bi.bufferCopier())
307void DataSynchronizeMultiDispatcherV2::
308synchronize(ConstArrayView<IVariable*> vars)
310 const Int32 nb_var = vars.size();
311 m_sync_buffer.setNbData(nb_var);
314 Int32 all_datatype_size = 0;
317 for (IVariable* var : vars) {
318 INumericDataInternal* numapi = var->data()->_commonInternal()->numericData();
320 ARCANE_FATAL(
"Variable '{0}' can not be synchronized because it is not a numeric data", var->name());
321 MutableMemoryView mem_view = numapi->memoryView();
322 all_datatype_size += mem_view.datatypeSize();
323 m_sync_buffer.setDataView(index, mem_view);
329 bool is_compare_sync =
false;
332 m_synchronize_implementation->beginSynchronize(&m_sync_buffer);
333 m_synchronize_implementation->endSynchronize(&m_sync_buffer);
356 void compute()
override {}
392SimpleDataSynchronizeImplementation::
393SimpleDataSynchronizeImplementation(
Factory* f)
394: m_parallel_mng(f->m_parallel_mng)
402arcaneCreateSimpleVariableSynchronizerFactory(
IParallelMng* pm)
411void SimpleDataSynchronizeImplementation::
412beginSynchronize(IDataSynchronizeBuffer* vs_buf)
415 IParallelMng* pm = m_parallel_mng;
417 const bool use_blocking_send =
false;
418 Int32 nb_message = vs_buf->nbRank();
425 for (Integer i = 0; i < nb_message; ++i) {
426 Int32 target_rank = vs_buf->targetRank(i);
427 auto buf = _toLegacySmallView(vs_buf->receiveBuffer(i));
429 Parallel::Request rval = pm->
recv(buf, target_rank,
false);
430 m_all_requests.add(rval);
434 vs_buf->copyAllSend();
437 for (Integer i = 0; i < nb_message; ++i) {
438 Int32 target_rank = vs_buf->targetRank(i);
439 auto buf = _toLegacySmallView(vs_buf->sendBuffer(i));
446 Parallel::Request rval = pm->send(buf, target_rank, use_blocking_send);
447 if (!use_blocking_send)
448 m_all_requests.add(rval);
456void SimpleDataSynchronizeImplementation::
457endSynchronize(IDataSynchronizeBuffer* vs_buf)
459 IParallelMng* pm = m_parallel_mng;
467 m_all_requests.clear();
470 vs_buf->copyAllReceive();
479IDataSynchronizeMultiDispatcher* IDataSynchronizeMultiDispatcher::
480create(
const DataSynchronizeDispatcherBuildInfo& bi)
485 if (
auto v = Convert::Type<Int32>::tryParseFromEnvironment(
"ARCANE_USE_LEGACY_MULTISYNCHRONIZE",
true))
487 return new DataSynchronizeMultiDispatcher(bi);
488 return new DataSynchronizeMultiDispatcherV2(bi);
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
#define ARCCORE_DEFINE_REFERENCE_COUNTED_INCLASS_METHODS()
Macro pour définir les méthodes gérant les compteurs de référence.
Gestionnaire d'exécution pour accélérateur.
void _compute()
Notifie l'implémentation que les informations de synchronisation ont changé.
Infos pour construire un DataSynchronizeDispatcher.
Gestion de la synchronisation pour une donnée.
DataSynchronizeResult endSynchronize() override
Termine la synchronisation.
void setSynchronizeBuffer(Ref< MemoryBuffer > buffer) override
Positionne le buffer de synchronisation.
void compute() override
Recalcule les informations nécessaires après une mise à jour des informations de DataSynchronizeInfo.
SingleDataSynchronizeBuffer m_sync_buffer
Gère les buffers d'envoi et réception pour la synchronisation.
void beginSynchronize(INumericDataInternal *data, bool is_compare_sync) override
Commence l'exécution pour la synchronisation pour la donnée data.
Synchronisation d'une liste de variables.
void compute() override
Recalcule les informations nécessaires après une mise à jour des informations de DataSynchronizeInfo.
void setSynchronizeBuffer(Ref< MemoryBuffer > buffer) override
Positionne le buffer de synchronisation.
Synchronisation d'une liste de variables.
void compute() override
Recalcule les informations nécessaires après une mise à jour des informations de DataSynchronizeInfo.
void setSynchronizeBuffer(Ref< MemoryBuffer >) override
Positionne le buffer de synchronisation.
Informations sur le résultat d'une synchronisation.
Buffer générique pour la synchronisation de données.
Interface pour gérer la synchronisation d'une donnée.
Interface d'une fabrique dispatcher générique.
Interface de la synchronisation d'une liste de variables.
Interface pour un 'IData' d'un type numérique.
virtual MutableMemoryView memoryView()=0
Vue mémoire sur la donnée.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual void recv(ArrayView< char > values, Int32 rank)=0
virtual void waitAllRequests(ArrayView< Request > rvalues)=0
Bloque en attendant que les requêtes rvalues soient terminées.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Implémentation de IDataSynchronizeBuffer pour plusieurs données.
void prepareSynchronize(Int32 datatype_size, bool is_compare_sync) override
Prépare la synchronisation.
Implémentation basique de la sérialisation.
Implémentation de IDataSynchronizeBuffer pour une donnée.
DataSynchronizeResult finalizeSynchronize()
Termine la synchronisation.
void prepareSynchronize(Int32 datatype_size, bool is_compare_sync) override
Prépare la synchronisation.
@ ModePut
Le sérialiseur attend des reserve()
@ ModeGet
Le sérialiseur attend des get()
Implémentation thread-safe d'un compteur de référence.
ITraceMng * traceMng() const
Gestionnaire de trace.
Ref< IParallelExchanger > createExchangerRef(IParallelMng *pm)
Retourne une interface pour transférer des messages entre rangs.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
UniqueArray< Int32 > Int32UniqueArray
Tableau dynamique à une dimension d'entiers 32 bits.
unsigned char Byte
Type d'un octet.
Int32 Integer
Type représentant un entier.