14#include "arcane/impl/internal/IDataSynchronizeDispatcher.h"
16#include "arcane/utils/FatalErrorException.h"
17#include "arcane/utils/PlatformUtils.h"
18#include "arcane/utils/MemoryView.h"
19#include "arcane/utils/ValueConvert.h"
20#include "arcane/utils/ITraceMng.h"
21#include "arcane/utils/internal/MemoryBuffer.h"
23#include "arcane/core/ParallelMngUtils.h"
24#include "arcane/core/IParallelExchanger.h"
25#include "arcane/core/ISerializeMessage.h"
26#include "arcane/core/ISerializer.h"
27#include "arcane/core/IParallelMng.h"
28#include "arcane/core/IData.h"
29#include "arcane/core/internal/IDataInternal.h"
31#include "arcane/impl/DataSynchronizeInfo.h"
32#include "arcane/impl/internal/DataSynchronizeBuffer.h"
33#include "arcane/impl/internal/IBufferCopier.h"
44 _toLegacySmallView(MutableMemoryView memory_view)
47 void* data = bytes.
data();
48 Int32 size = bytes.smallView().size();
49 return { size,
reinterpret_cast<Byte*
>(data) };
59class DataSynchronizeDispatcherBase
64 ~DataSynchronizeDispatcherBase();
69 Runner* m_runner =
nullptr;
81DataSynchronizeDispatcherBase::
83: m_parallel_mng(bi.parallelMng())
84, m_sync_info(bi.synchronizeInfo())
85, m_synchronize_implementation(bi.synchronizeImplementation())
92DataSynchronizeDispatcherBase::
93~DataSynchronizeDispatcherBase()
106 m_synchronize_implementation->compute();
117class ARCANE_IMPL_EXPORT DataSynchronizeDispatcher
119,
public DataSynchronizeDispatcherBase
127 : DataSynchronizeDispatcherBase(bi)
143 bool m_is_in_sync =
false;
144 bool m_is_empty_sync =
false;
158 MutableMemoryView mem_view = data->
memoryView();
159 Int32 full_datatype_size = mem_view.datatypeSize();
162 ARCANE_FATAL(
"_beginSynchronize() has already been called");
165 m_is_empty_sync = (mem_view.bytes().size() == 0);
169 m_sync_buffer.prepareSynchronize(full_datatype_size, is_compare_sync);
170 m_synchronize_implementation->beginSynchronize(&
m_sync_buffer);
180 ARCANE_FATAL(
"No pending synchronize(). You need to call beginSynchronize() before");
182 if (!m_is_empty_sync) {
183 m_synchronize_implementation->endSynchronize(&
m_sync_buffer);
186 m_is_in_sync =
false;
210class ARCANE_IMPL_EXPORT DataSynchronizeMultiDispatcher
216 : m_parallel_mng(bi.parallelMng())
217 , m_sync_info(bi.synchronizeInfo())
234void DataSynchronizeMultiDispatcher::
235synchronize(ConstArrayView<IVariable*> vars)
238 Integer nb_rank = m_sync_info->size();
240 for (
Integer i = 0; i < nb_rank; ++i) {
241 Int32 rank = m_sync_info->targetRank(i);
242 exchanger->addSender(rank);
243 recv_ranks[i] = rank;
245 exchanger->initializeCommunicationsMessages(recv_ranks);
246 for (
Integer i = 0; i < nb_rank; ++i) {
247 ISerializeMessage* msg = exchanger->messageToSend(i);
248 ISerializer* sbuf = msg->serializer();
250 sbuf->setMode(ISerializer::ModeReserve);
251 for (IVariable* var : vars) {
252 var->serialize(sbuf, share_ids,
nullptr);
254 sbuf->allocateBuffer();
256 for (IVariable* var : vars) {
257 var->serialize(sbuf, share_ids,
nullptr);
260 exchanger->processExchange();
261 for (
Integer i = 0; i < nb_rank; ++i) {
262 ISerializeMessage* msg = exchanger->messageToReceive(i);
263 ISerializer* sbuf = msg->serializer();
266 for (IVariable* var : vars) {
267 var->serialize(sbuf, ghost_ids,
nullptr);
283class ARCANE_IMPL_EXPORT DataSynchronizeMultiDispatcherV2
284:
public DataSynchronizeDispatcherBase
290 : DataSynchronizeDispatcherBase(bi)
291 , m_sync_buffer(bi.parallelMng()->
traceMng(), m_sync_info.get(), bi.bufferCopier())
307void DataSynchronizeMultiDispatcherV2::
308synchronize(ConstArrayView<IVariable*> vars)
310 const Int32 nb_var = vars.size();
311 m_sync_buffer.setNbData(nb_var);
314 Int32 all_datatype_size = 0;
317 for (IVariable* var : vars) {
318 INumericDataInternal* numapi = var->data()->_commonInternal()->numericData();
320 ARCANE_FATAL(
"Variable '{0}' can not be synchronized because it is not a numeric data", var->name());
321 MutableMemoryView mem_view = numapi->memoryView();
322 all_datatype_size += mem_view.datatypeSize();
323 m_sync_buffer.setDataView(index, mem_view);
329 bool is_compare_sync =
false;
330 m_sync_buffer.prepareSynchronize(all_datatype_size, is_compare_sync);
332 m_synchronize_implementation->beginSynchronize(&m_sync_buffer);
333 m_synchronize_implementation->endSynchronize(&m_sync_buffer);
346class SimpleDataSynchronizeImplementation
352 explicit SimpleDataSynchronizeImplementation(
Factory* f);
356 void compute()
override {}
380 auto* x =
new SimpleDataSynchronizeImplementation(
this);
392SimpleDataSynchronizeImplementation::
393SimpleDataSynchronizeImplementation(
Factory* f)
394: m_parallel_mng(f->m_parallel_mng)
402arcaneCreateSimpleVariableSynchronizerFactory(
IParallelMng* pm)
411void SimpleDataSynchronizeImplementation::
412beginSynchronize(IDataSynchronizeBuffer* vs_buf)
415 IParallelMng* pm = m_parallel_mng;
417 const bool use_blocking_send =
false;
418 Int32 nb_message = vs_buf->nbRank();
425 for (
Integer i = 0; i < nb_message; ++i) {
426 Int32 target_rank = vs_buf->targetRank(i);
427 auto buf = _toLegacySmallView(vs_buf->receiveBuffer(i));
429 Parallel::Request rval = pm->recv(buf, target_rank,
false);
430 m_all_requests.add(rval);
434 vs_buf->copyAllSend();
437 for (
Integer i = 0; i < nb_message; ++i) {
438 Int32 target_rank = vs_buf->targetRank(i);
439 auto buf = _toLegacySmallView(vs_buf->sendBuffer(i));
446 Parallel::Request rval = pm->send(buf, target_rank, use_blocking_send);
447 if (!use_blocking_send)
448 m_all_requests.add(rval);
456void SimpleDataSynchronizeImplementation::
457endSynchronize(IDataSynchronizeBuffer* vs_buf)
459 IParallelMng* pm = m_parallel_mng;
467 m_all_requests.clear();
470 vs_buf->copyAllReceive();
479IDataSynchronizeMultiDispatcher* IDataSynchronizeMultiDispatcher::
480create(
const DataSynchronizeDispatcherBuildInfo& bi)
487 return new DataSynchronizeMultiDispatcher(bi);
488 return new DataSynchronizeMultiDispatcherV2(bi);
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
#define ARCCORE_DEFINE_REFERENCE_COUNTED_INCLASS_METHODS()
Macro pour définir les méthodes gérant les compteurs de référence.
Gestionnaire d'exécution pour accélérateur.
Vue modifiable d'un tableau d'un type T.
Vue constante d'un tableau de type T.
static std::optional< Int32 > tryParseFromEnvironment(StringView s, bool throw_if_invalid)
void _compute()
Notifie l'implémentation que les informations de synchronisation ont changé.
Infos pour construire un DataSynchronizeDispatcher.
Gestion de la synchronisation pour une donnée.
DataSynchronizeResult endSynchronize() override
Termine la synchronisation.
void setSynchronizeBuffer(Ref< MemoryBuffer > buffer) override
Positionne le buffer de synchronisation.
void compute() override
Recalcule les informations nécessaires après une mise à jour des informations de DataSynchronizeInfo.
SingleDataSynchronizeBuffer m_sync_buffer
Gère les buffers d'envoi et réception pour la synchronisation.
void beginSynchronize(INumericDataInternal *data, bool is_compare_sync) override
Commence l'exécution pour la synchronisation pour la donnée data.
void compute() override
Recalcule les informations nécessaires après une mise à jour des informations de DataSynchronizeInfo.
void setSynchronizeBuffer(Ref< MemoryBuffer > buffer) override
Positionne le buffer de synchronisation.
void compute() override
Recalcule les informations nécessaires après une mise à jour des informations de DataSynchronizeInfo.
void setSynchronizeBuffer(Ref< MemoryBuffer >) override
Positionne le buffer de synchronisation.
Informations sur le résultat d'une synchronisation.
Buffer générique pour la synchronisation de données.
Interface pour gérer la synchronisation d'une donnée.
Interface d'une fabrique dispatcher générique.
Interface de la synchronisation d'une liste de variables.
Interface pour un 'IData' d'un type numérique.
virtual MutableMemoryView memoryView()=0
Vue mémoire sur la donnée.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual ITraceMng * traceMng() const =0
Gestionnaire de traces.
virtual void waitAllRequests(ArrayView< Request > rvalues)=0
Bloque en attendant que les requêtes rvalues soient terminées.
@ ModePut
Le sérialiseur attend des reserve()
@ ModeGet
Le sérialiseur attend des get()
Implémentation de IDataSynchronizeBuffer pour plusieurs données.
Référence à une instance.
Implémentation thread-safe d'un compteur de référence.
Implémentation de IDataSynchronizeBuffer pour une donnée.
constexpr __host__ __device__ pointer data() const noexcept
Pointeur sur le début de la vue.
Vue d'un tableau d'éléments de type T.
Vecteur 1D de données avec sémantique par valeur (style STL).
Ref< IParallelExchanger > createExchangerRef(IParallelMng *pm)
Retourne une interface pour transférer des messages entre rangs.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
Int32 Integer
Type représentant un entier.
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
UniqueArray< Int32 > Int32UniqueArray
Tableau dynamique à une dimension d'entiers 32 bits.
unsigned char Byte
Type d'un octet.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int32_t Int32
Type entier signé sur 32 bits.