14#include "arcane/impl/internal/VariableSynchronizer.h"
16#include "arcane/utils/PlatformUtils.h"
17#include "arcane/utils/NotSupportedException.h"
18#include "arcane/utils/FatalErrorException.h"
19#include "arcane/utils/ITraceMng.h"
20#include "arcane/utils/ValueConvert.h"
22#include "arcane/utils/internal/MemoryBuffer.h"
24#include "arcane/core/VariableSynchronizerEventArgs.h"
25#include "arcane/core/IParallelMng.h"
26#include "arcane/core/IItemFamily.h"
27#include "arcane/core/ItemPrinter.h"
28#include "arcane/core/IVariable.h"
29#include "arcane/core/IData.h"
30#include "arcane/core/VariableCollection.h"
31#include "arcane/core/Timer.h"
32#include "arcane/core/IMesh.h"
33#include "arcane/core/IVariableMng.h"
34#include "arcane/core/IVariableSynchronizerMng.h"
35#include "arcane/core/parallel/IStat.h"
36#include "arcane/core/internal/IDataInternal.h"
37#include "arcane/core/internal/IParallelMngInternal.h"
38#include "arcane/core/internal/IVariableSynchronizerMngInternal.h"
40#include "arcane/impl/DataSynchronizeInfo.h"
41#include "arcane/impl/internal/VariableSynchronizerComputeList.h"
42#include "arcane/impl/internal/IBufferCopier.h"
56arcaneCreateSimpleVariableSynchronizerFactory(
IParallelMng* pm);
74 : m_synchronizer_mng(sync_mng)
75 , m_allocator(allocator)
76 , m_buffer(sync_mng->createSynchronizeBuffer(allocator))
79 ~ScopedBuffer()
noexcept(
false)
81 m_synchronizer_mng->releaseSynchronizeBuffer(m_allocator, m_buffer.get());
95 : m_variable_synchronizer_mng(var_syncer->synchronizeMng())
98 , m_event_args(var_syncer)
99 , m_allocator(allocator)
103 if (!m_multi_dispatcher)
108 delete m_multi_dispatcher;
115 m_dispatcher->compute();
119 void initialize(IVariable* var)
122 m_event_args.initialize(var);
126 void initialize(
const VariableCollection& vars)
129 m_event_args.initialize(vars);
130 for (VariableCollectionEnumerator v(vars); ++v;)
134 Int32 nbVariable()
const {
return m_variables.size(); }
135 ConstArrayView<IVariable*> variables()
const {
return m_variables; }
140 Int32 nb_var = m_variables.size();
144 bool is_compare_sync = m_variable_synchronizer_mng->isSynchronizationComparisonEnabled();
145 m_synchronize_result = synchronizeData(m_data_list[0], is_compare_sync);
148 ScopedBuffer tmp_buf(m_variable_synchronizer_mng->_internalApi(), m_allocator);
149 m_multi_dispatcher->setSynchronizeBuffer(tmp_buf.m_buffer);
150 m_multi_dispatcher->synchronize(m_variables);
153 var->setIsSynchronized();
158 ScopedBuffer tmp_buf(m_variable_synchronizer_mng->_internalApi(), m_allocator);
159 m_dispatcher->setSynchronizeBuffer(tmp_buf.m_buffer);
160 m_dispatcher->beginSynchronize(data, is_compare_sync);
161 return m_dispatcher->endSynchronize();
164 VariableSynchronizerEventArgs& eventArgs() {
return m_event_args; }
168 IVariableSynchronizerMng* m_variable_synchronizer_mng =
nullptr;
169 Ref<IDataSynchronizeDispatcher> m_dispatcher;
170 IDataSynchronizeMultiDispatcher* m_multi_dispatcher =
nullptr;
171 VariableSynchronizerEventArgs m_event_args;
172 UniqueArray<IVariable*> m_variables;
173 UniqueArray<INumericDataInternal*> m_data_list;
174 DataSynchronizeResult m_synchronize_result;
175 IMemoryAllocator* m_allocator =
nullptr;
185 void _addVariable(IVariable* var)
187 INumericDataInternal* numapi = var->data()->_commonInternal()->numericData();
189 ARCANE_FATAL(
"Variable '{0}' can not be synchronized because it is not a numeric data", var->name());
190 m_variables.add(var);
191 m_data_list.add(numapi);
198VariableSynchronizer::
205 m_sync_info = DataSynchronizeInfo::create();
206 m_partial_sync_info = DataSynchronizeInfo::create();
208 if (!implementation_factory.get())
209 implementation_factory = arcaneCreateSimpleVariableSynchronizerFactory(pm);
210 m_implementation_factory = implementation_factory;
212 m_variable_synchronizer_mng = group.itemFamily()->mesh()->variableMng()->synchronizerMng();
216 if (s ==
"0" || s ==
"FALSE" || s ==
"false")
217 m_allow_multi_sync =
false;
221 if (s ==
"1" || s ==
"TRUE" || s ==
"true")
227 m_is_check_coherence = (v.value() != 0);
229 m_default_message = _buildMessage();
236VariableSynchronizer::
237~VariableSynchronizer()
240 delete m_default_message;
246VariableSynchronizer::SyncMessage* VariableSynchronizer::
249 auto* internal_pm = m_parallel_mng->_internalApi();
250 Runner runner = internal_pm->runner();
251 bool is_accelerator_aware = internal_pm->isAcceleratorAware();
253 if (runner.isInitialized() && is_accelerator_aware) {
257 return _buildMessage(m_sync_info);
263VariableSynchronizer::SyncMessage* VariableSynchronizer::
264_buildMessage(Ref<DataSynchronizeInfo>& sync_info)
266 GroupIndexTable* table =
nullptr;
267 if (!m_item_group.isAllItems())
268 table = m_item_group.localIdToIndex().get();
270 Ref<IBufferCopier> buffer_copier;
276 auto* internal_pm = m_parallel_mng->_internalApi();
278 IMemoryAllocator* allocator =
nullptr;
282 if (m_runner.isInitialized()) {
283 buffer_copier->setRunQueue(internal_pm->queue());
288 Ref<IDataSynchronizeImplementation> sync_impl = m_implementation_factory->createInstance();
289 sync_impl->setDataSynchronizeInfo(sync_info.get());
291 DataSynchronizeDispatcherBuildInfo bi(m_parallel_mng, sync_impl, sync_info, buffer_copier);
304 VariableSynchronizerComputeList computer(
this);
308 m_default_message->compute();
316void VariableSynchronizer::
317_doSynchronize(SyncMessage* message)
327 _sendBeginEvent(event_args);
331 message->synchronize();
334 Int32 nb_var = message->nbVariable();
337 if (nb_var == 1 && m_variable_synchronizer_mng->isSynchronizationComparisonEnabled()) {
348 _sendEndEvent(event_args);
354void VariableSynchronizer::
360 if (local_ids == m_partial_local_ids.constView()) {
367 m_partial_local_ids.copy(local_ids);
369 UniqueArray<bool> flags(m_item_group.itemFamily()->maxLocalId());
372 for (
Int32 lid : local_ids) {
379 const DataSynchronizeBufferInfoList& send_info = m_sync_info->sendInfo();
380 const DataSynchronizeBufferInfoList& recv_info = m_sync_info->receiveInfo();
382 m_partial_sync_info = DataSynchronizeInfo::create();
384 if (!local_ids.empty()) {
386 UniqueArray<Int32> recv_grp;
387 UniqueArray<Int32> send_grp;
389 for (
Int32 index = 0; index < nb_comm_ranks; ++index) {
390 Int32 target_rank = comm_ranks[index];
391 ConstArrayView<Int32> send_lids = send_info.localIds(index);
392 ConstArrayView<Int32> recv_lids = recv_info.localIds(index);
397 for (
Int32 lid : recv_lids) {
403 for (
Int32 lid : send_lids) {
409 if ((!send_grp.empty()) || (!recv_grp.empty())) {
411 m_partial_sync_info->add(VariableSyncInfo(send_grp, recv_grp, target_rank));
416 m_partial_sync_info->recompute();
418 m_partial_message->compute();
424void VariableSynchronizer::
427 message->initialize(var);
429 IParallelMng* pm = m_parallel_mng;
430 if (m_is_check_coherence)
432 debug(
Trace::High) <<
" Proc " << pm->commRank() <<
" Sync variable " << var->fullName();
434 info() <<
" Synchronize variable " << var->fullName()
437 _doSynchronize(message);
446 _synchronize(var, m_default_message);
455 _rebuildMessage(local_ids);
456 _synchronize(var, m_partial_message.get());
468 const bool use_multi = m_allow_multi_sync;
470 _synchronizeMulti(vars, m_default_message);
473 for (VariableCollection::Enumerator ivar(vars); ++ivar;) {
474 _synchronize(*ivar, m_default_message);
488 _rebuildMessage(local_ids);
490 const bool use_multi = m_allow_multi_sync;
492 _synchronizeMulti(vars, m_partial_message.get());
495 for (VariableCollection::Enumerator ivar(vars); ++ivar;) {
496 _synchronize(*ivar, m_partial_message.get());
507 return m_default_message->synchronizeData(data, is_compare_sync);
519 ARCANE_FATAL(
"Data can not be synchronized because it is not a numeric data");
520 _synchronize(numapi,
false);
529 info(4) <<
"** VariableSynchronizer::changeLocalIds() group=" << m_item_group.name();
530 m_sync_info->changeLocalIds(old_to_new_ids);
531 m_default_message->compute();
549 if (vars.count() == 1)
553 for (VariableCollection::Enumerator ivar(vars); ++ivar;) {
562 if (group != var_group)
571void VariableSynchronizer::
574 message->initialize(vars);
579 info() <<
" MultiSynchronize"
583 _doSynchronize(message);
592 return m_sync_info->communicatingRanks();
601 return m_sync_info->sendInfo().localIds(index);
610 return m_sync_info->receiveInfo().localIds(index);
616void VariableSynchronizer::
620 args.setState(VariableSynchronizerEventArgs::State::BeginSynchronize);
627void VariableSynchronizer::
628_sendEndEvent(VariableSynchronizerEventArgs& args)
632 m_parallel_mng->
stat()->
add(
"Synchronize", elapsed_time, 1);
633 args.setState(VariableSynchronizerEventArgs::State::EndSynchronize);
634 args.setElapsedTime(elapsed_time);
641void VariableSynchronizer::
642_sendEvent(VariableSynchronizerEventArgs& args)
644 m_variable_synchronizer_mng->onSynchronized().notify(args);
645 m_on_synchronized.notify(args);
651void VariableSynchronizer::
655 m_sync_timer =
new Timer(m_parallel_mng->timerMng(),
"SyncTimer",
Timer::TimerReal);
671 if (m_runner.isInitialized())
672 m_runner.setAsCurrentDevice();
#define ARCANE_CHECK_POINTER(ptr)
Macro returning the pointer ptr if it is not null or throwing an exception if it is null.
#define ARCANE_FATAL(...)
Macro throwing a FatalErrorException.
Memory and allocator management functions.
constexpr Integer size() const noexcept
Number of elements in the array.
static std::optional< Int32 > tryParseFromEnvironment(StringView s, bool throw_if_invalid)
Info to build a DataSynchronizeDispatcher.
Information about the result of a synchronization.
virtual INumericDataInternal * numericData()
Generic interface for numeric data (nullptr if the data is not numeric).
Interface to manage the synchronization of a data item.
Interface for synchronizing a list of variables.
virtual void compute()=0
Recalculates the necessary information after an update to the DataSynchronizeInfo.
Interface of a data item.
virtual IDataInternal * _commonInternal()=0
Interface for a memory allocator.
Interface for an 'IData' of a numeric type.
Interface of the parallelism manager for a subdomain.
virtual Int32 commRank() const =0
Rank of this instance in the communicator.
virtual ITimeStats * timeStats() const =0
Associated statistics manager (can be null).
virtual IStat * stat()=0
Statistics manager.
Interface managing execution time statistics.
Internal Arcane API for IVariableSynchronizerMng.
virtual bool isPartial() const =0
Indicates if the variable is partial.
virtual ItemGroup itemGroup() const =0
Associated mesh group.
virtual void add(const String &name, double elapsed_time, Int64 msg_size)=0
Adds a statistic.
Reference to an instance.
Positions the phase of the currently executing action.
Sentinel for the timer. The sentinel associated with a timer allows it to be triggered upon its const...
@ TimerReal
Timer using real time.
Real lastActivationTime() const
Returns the time (in seconds) spent during the last activation of the timer.
TraceMessageDbg debug(Trace::eDebugLevel=Trace::Medium) const
Flow for a debug message.
TraceMessage info() const
Flow for an information message.
void compute()
Creation of the synchronization element list.
Arguments for the event notifying a variable synchronization.
@ Different
Different values before and after synchronization.
@ Same
Same values before and after synchronization.
void setCompareStatus(Int32 i, CompareStatus v)
Sets the comparison status of the i-th variable.
Synchronization management.
void synchronize()
Performs the synchronization.
void changeLocalIds(Int32ConstArrayView old_to_new_ids) override
Called when the local IDs of the entities are modified.
void compute() override
Creation of the list of synchronization elements.
bool _canSynchronizeMulti(const VariableCollection &vars)
Indicates if the variables in the list vars can be synchronized at once.
Int32ConstArrayView ghostItems(Int32 index) override
List of local IDs of ghost entities with a subdomain.
Int32ConstArrayView sharedItems(Int32 index) override
List of local IDs of entities shared with a subdomain.
void _setCurrentDevice()
Positions the device associated with our RunQueue as the current device.
void synchronizeData(IData *data) override
Synchronizes the data data.
void synchronize(IVariable *var) override
Synchronizes the variable var in blocking mode.
Int32ConstArrayView communicatingRanks() override
Ranks of subdomains with which communication occurs.
IMemoryAllocator * getAllocator(eMemoryResource mem_resource)
Default allocator for the resource mem_resource.
void namedBarrier(IParallelMng *pm, const String &name)
Performs a named barrier with name name.
-- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature --
ConstArrayView< Int32 > Int32ConstArrayView
C equivalent of a 1D array of 32-bit integers.
eDataSynchronizeCompareStatus
Comparison of ghost entity values before/after synchronization.
@ Different
Different values before and after synchronization.
@ Same
Same values before and after synchronization.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Creates a reference on a pointer.
std::int32_t Int32
Signed integer type of 32 bits.