Arcane  4.1.12.0
Developer documentation
Loading...
Searching...
No Matches
DataSynchronizeDispatcher.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2026 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* DataSynchronizeDispatcher.cc (C) 2000-2025 */
9/* */
10/* Management of synchronization for an 'IData' instance. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/impl/internal/IDataSynchronizeDispatcher.h"
15
16#include "arcane/utils/FatalErrorException.h"
17#include "arcane/utils/PlatformUtils.h"
18#include "arcane/utils/MemoryView.h"
19#include "arcane/utils/ValueConvert.h"
20#include "arcane/utils/ITraceMng.h"
21#include "arcane/utils/internal/MemoryBuffer.h"
22
23#include "arcane/core/ParallelMngUtils.h"
24#include "arcane/core/IParallelExchanger.h"
25#include "arcane/core/ISerializeMessage.h"
26#include "arcane/core/ISerializer.h"
27#include "arcane/core/IParallelMng.h"
28#include "arcane/core/IData.h"
29#include "arcane/core/internal/IDataInternal.h"
30
31#include "arcane/impl/DataSynchronizeInfo.h"
32#include "arcane/impl/internal/DataSynchronizeBuffer.h"
33
34/*---------------------------------------------------------------------------*/
35/*---------------------------------------------------------------------------*/
36
37namespace Arcane
38{
39
40namespace
41{
43 _toLegacySmallView(MutableMemoryView memory_view)
44 {
45 Span<std::byte> bytes = memory_view.bytes();
46 void* data = bytes.data();
47 Int32 size = bytes.smallView().size();
48 return { size, reinterpret_cast<Byte*>(data) };
49 }
50} // namespace
51
52/*---------------------------------------------------------------------------*/
53/*---------------------------------------------------------------------------*/
54
55/*---------------------------------------------------------------------------*/
56/*---------------------------------------------------------------------------*/
57
58class DataSynchronizeDispatcherBase
59{
60 public:
61
62 explicit DataSynchronizeDispatcherBase(const DataSynchronizeDispatcherBuildInfo& bi);
63 ~DataSynchronizeDispatcherBase();
64
65 protected:
66
67 IParallelMng* m_parallel_mng = nullptr;
68 Runner* m_runner = nullptr;
69 Ref<DataSynchronizeInfo> m_sync_info;
70 Ref<IDataSynchronizeImplementation> m_synchronize_implementation;
71
72 protected:
73
74 void _compute();
75};
76
77/*---------------------------------------------------------------------------*/
78/*---------------------------------------------------------------------------*/
79
80DataSynchronizeDispatcherBase::
81DataSynchronizeDispatcherBase(const DataSynchronizeDispatcherBuildInfo& bi)
82: m_parallel_mng(bi.parallelMng())
83, m_sync_info(bi.synchronizeInfo())
84, m_synchronize_implementation(bi.synchronizeImplementation())
85{
86}
87
88/*---------------------------------------------------------------------------*/
89/*---------------------------------------------------------------------------*/
90
91DataSynchronizeDispatcherBase::
92~DataSynchronizeDispatcherBase()
93{
94}
95
96/*---------------------------------------------------------------------------*/
97/*---------------------------------------------------------------------------*/
98
104_compute()
105{
106 m_synchronize_implementation->compute();
107}
108
109/*---------------------------------------------------------------------------*/
110/*---------------------------------------------------------------------------*/
111
112/*---------------------------------------------------------------------------*/
113/*---------------------------------------------------------------------------*/
114
118class ARCANE_IMPL_EXPORT DataSynchronizeDispatcher
119: private ReferenceCounterImpl
120, public DataSynchronizeDispatcherBase
122{
124
125 public:
126
127 explicit DataSynchronizeDispatcher(const DataSynchronizeDispatcherBuildInfo& bi)
128 : DataSynchronizeDispatcherBase(bi)
129 , m_sync_buffer(bi.parallelMng()->traceMng(), m_sync_info.get(), bi.bufferCopier())
130 {
131 }
132
133 public:
134
135 void compute() override { _compute(); }
136 void setSynchronizeBuffer(Ref<MemoryBuffer> buffer) override { m_sync_buffer.setSynchronizeBuffer(buffer); }
137 void beginSynchronize(INumericDataInternal* data, bool is_compare_sync) override;
138 DataSynchronizeResult endSynchronize() override;
139
140 private:
141
144 bool m_is_in_sync = false;
145 bool m_is_empty_sync = false;
146};
147
148/*---------------------------------------------------------------------------*/
149/*---------------------------------------------------------------------------*/
150
151/*---------------------------------------------------------------------------*/
152/*---------------------------------------------------------------------------*/
153
155beginSynchronize(INumericDataInternal* data, bool is_compare_sync)
156{
158
159 MutableMemoryView mem_view = data->memoryView();
160
161 if (m_is_in_sync)
162 ARCANE_FATAL("_beginSynchronize() has already been called");
163 m_is_in_sync = true;
164
165 m_is_empty_sync = (mem_view.bytes().size() == 0);
166 if (m_is_empty_sync)
167 return;
168 m_sync_buffer.setDataView(mem_view);
169 m_sync_buffer.prepareSynchronize(is_compare_sync);
170 m_synchronize_implementation->beginSynchronize(&m_sync_buffer);
171}
172
173/*---------------------------------------------------------------------------*/
174/*---------------------------------------------------------------------------*/
175
178{
179 if (!m_is_in_sync)
180 ARCANE_FATAL("No pending synchronize(). You need to call beginSynchronize() before");
182 if (!m_is_empty_sync) {
183 m_synchronize_implementation->endSynchronize(&m_sync_buffer);
184 result = m_sync_buffer.finalizeSynchronize();
185 }
186 m_is_in_sync = false;
187 return result;
188}
189
190/*---------------------------------------------------------------------------*/
191/*---------------------------------------------------------------------------*/
192
193/*---------------------------------------------------------------------------*/
194/*---------------------------------------------------------------------------*/
195
196Ref<IDataSynchronizeDispatcher> IDataSynchronizeDispatcher::
197create(const DataSynchronizeDispatcherBuildInfo& build_info)
198{
200}
201
202/*---------------------------------------------------------------------------*/
203/*---------------------------------------------------------------------------*/
204
205/*---------------------------------------------------------------------------*/
206/*---------------------------------------------------------------------------*/
207
211class ARCANE_IMPL_EXPORT DataSynchronizeMultiDispatcher
213{
214 public:
215
216 explicit DataSynchronizeMultiDispatcher(const DataSynchronizeDispatcherBuildInfo& bi)
217 : m_parallel_mng(bi.parallelMng())
218 , m_sync_info(bi.synchronizeInfo())
219 {
220 }
221
222 void compute() override {}
224 void synchronize(ConstArrayView<IVariable*> vars) override;
225
226 private:
227
228 IParallelMng* m_parallel_mng = nullptr;
229 Ref<DataSynchronizeInfo> m_sync_info;
230};
231
232/*---------------------------------------------------------------------------*/
233/*---------------------------------------------------------------------------*/
234
235void DataSynchronizeMultiDispatcher::
236synchronize(ConstArrayView<IVariable*> vars)
237{
238 Ref<IParallelExchanger> exchanger{ ParallelMngUtils::createExchangerRef(m_parallel_mng) };
239 Integer nb_rank = m_sync_info->size();
240 Int32UniqueArray recv_ranks(nb_rank);
241 for (Integer i = 0; i < nb_rank; ++i) {
242 Int32 rank = m_sync_info->targetRank(i);
243 exchanger->addSender(rank);
244 recv_ranks[i] = rank;
245 }
246 exchanger->initializeCommunicationsMessages(recv_ranks);
247 for (Integer i = 0; i < nb_rank; ++i) {
248 ISerializeMessage* msg = exchanger->messageToSend(i);
249 ISerializer* sbuf = msg->serializer();
250 Int32ConstArrayView share_ids = m_sync_info->sendInfo().localIds(i);
251 sbuf->setMode(ISerializer::ModeReserve);
252 for (IVariable* var : vars) {
253 var->serialize(sbuf, share_ids, nullptr);
254 }
255 sbuf->allocateBuffer();
256 sbuf->setMode(ISerializer::ModePut);
257 for (IVariable* var : vars) {
258 var->serialize(sbuf, share_ids, nullptr);
259 }
260 }
261 exchanger->processExchange();
262 for (Integer i = 0; i < nb_rank; ++i) {
263 ISerializeMessage* msg = exchanger->messageToReceive(i);
264 ISerializer* sbuf = msg->serializer();
265 Int32ConstArrayView ghost_ids = m_sync_info->receiveInfo().localIds(i);
266 sbuf->setMode(ISerializer::ModeGet);
267 for (IVariable* var : vars) {
268 var->serialize(sbuf, ghost_ids, nullptr);
269 }
270 }
271}
272
273/*---------------------------------------------------------------------------*/
274/*---------------------------------------------------------------------------*/
275
276/*---------------------------------------------------------------------------*/
277/*---------------------------------------------------------------------------*/
278
284class ARCANE_IMPL_EXPORT DataSynchronizeMultiDispatcherV2
285: public DataSynchronizeDispatcherBase
287{
288 public:
289
290 explicit DataSynchronizeMultiDispatcherV2(const DataSynchronizeDispatcherBuildInfo& bi)
291 : DataSynchronizeDispatcherBase(bi)
292 , m_sync_buffer(bi.parallelMng()->traceMng(), m_sync_info.get(), bi.bufferCopier())
293 {
294 }
295
296 void compute() override { _compute(); }
297 void setSynchronizeBuffer(Ref<MemoryBuffer> buffer) override { m_sync_buffer.setSynchronizeBuffer(buffer); }
298 void synchronize(ConstArrayView<IVariable*> vars) override;
299
300 private:
301
302 MultiDataSynchronizeBuffer m_sync_buffer;
303};
304
305/*---------------------------------------------------------------------------*/
306/*---------------------------------------------------------------------------*/
307
308void DataSynchronizeMultiDispatcherV2::
309synchronize(ConstArrayView<IVariable*> vars)
310{
311 const Int32 nb_var = vars.size();
312 m_sync_buffer.setNbData(nb_var);
313
314 // Retrieves the memory locations and size of the variable data
315 {
316 Int32 index = 0;
317 for (IVariable* var : vars) {
318 INumericDataInternal* numapi = var->data()->_commonInternal()->numericData();
319 if (!numapi)
320 ARCANE_FATAL("Variable '{0}' can not be synchronized because it is not a numeric data", var->name());
321 MutableMemoryView mem_view = numapi->memoryView();
322 m_sync_buffer.setDataView(index, mem_view);
323 ++index;
324 }
325 }
326
327 // TODO: should be passed as a function parameter
328 bool is_compare_sync = false;
329 m_sync_buffer.prepareSynchronize(is_compare_sync);
330
331 m_synchronize_implementation->beginSynchronize(&m_sync_buffer);
332 m_synchronize_implementation->endSynchronize(&m_sync_buffer);
333}
334
335/*---------------------------------------------------------------------------*/
336/*---------------------------------------------------------------------------*/
337
338/*---------------------------------------------------------------------------*/
339/*---------------------------------------------------------------------------*/
340
346class SimpleDataSynchronizeImplementation
348{
349 public:
350
351 class Factory;
352 explicit SimpleDataSynchronizeImplementation(Factory* f);
353
354 protected:
355
356 void compute() override {}
357 void beginSynchronize(IDataSynchronizeBuffer* buf) override;
358 void endSynchronize(IDataSynchronizeBuffer* buf) override;
359
360 private:
361
362 IParallelMng* m_parallel_mng = nullptr;
363 UniqueArray<Parallel::Request> m_all_requests;
364};
365
366/*---------------------------------------------------------------------------*/
367/*---------------------------------------------------------------------------*/
368
371{
372 public:
373
374 explicit Factory(IParallelMng* pm)
375 : m_parallel_mng(pm)
376 {}
377
378 Ref<IDataSynchronizeImplementation> createInstance() override
379 {
380 auto* x = new SimpleDataSynchronizeImplementation(this);
382 }
383
384 public:
385
386 IParallelMng* m_parallel_mng = nullptr;
387};
388
389/*---------------------------------------------------------------------------*/
390/*---------------------------------------------------------------------------*/
391
392SimpleDataSynchronizeImplementation::
393SimpleDataSynchronizeImplementation(Factory* f)
394: m_parallel_mng(f->m_parallel_mng)
395{
396}
397
398/*---------------------------------------------------------------------------*/
399/*---------------------------------------------------------------------------*/
400
402arcaneCreateSimpleVariableSynchronizerFactory(IParallelMng* pm)
403{
406}
407
408/*---------------------------------------------------------------------------*/
409/*---------------------------------------------------------------------------*/
410
411void SimpleDataSynchronizeImplementation::
412beginSynchronize(IDataSynchronizeBuffer* vs_buf)
413{
414 ARCANE_CHECK_POINTER(vs_buf);
415 IParallelMng* pm = m_parallel_mng;
416
417 const bool use_blocking_send = false;
418 Int32 nb_message = vs_buf->nbRank();
419
420 /*pm->traceMng()->info() << " ** ** COMMON BEGIN SYNC n=" << nb_message
421 << " this=" << (IVariableSynchronizeDispatcher*)this
422 << " m_sync_info=" << &this->m_sync_info;*/
423
424 // Sends non-blocking receive messages
425 for (Integer i = 0; i < nb_message; ++i) {
426 Int32 target_rank = vs_buf->targetRank(i);
427 auto buf = _toLegacySmallView(vs_buf->receiveBuffer(i));
428 if (!buf.empty()) {
429 Parallel::Request rval = pm->recv(buf, target_rank, false);
430 m_all_requests.add(rval);
431 }
432 }
433
434 vs_buf->copyAllSend();
435
436 // Sends send messages in non-blocking mode.
437 for (Integer i = 0; i < nb_message; ++i) {
438 Int32 target_rank = vs_buf->targetRank(i);
439 auto buf = _toLegacySmallView(vs_buf->sendBuffer(i));
440
441 //ConstArrayView<SimpleType> const_share = share_local_buffer;
442 if (!buf.empty()) {
443 //for( Integer i=0, is=share_local_buffer.size(); i<is; ++i )
444 //trace->info() << "TO rank=" << vsi.m_target_rank << " I=" << i << " V=" << share_local_buffer[i]
445 // << " lid=" << share_grp[i] << " v2=" << var_values[share_grp[i]];
446 Parallel::Request rval = pm->send(buf, target_rank, use_blocking_send);
447 if (!use_blocking_send)
448 m_all_requests.add(rval);
449 }
450 }
451}
452
453/*---------------------------------------------------------------------------*/
454/*---------------------------------------------------------------------------*/
455
456void SimpleDataSynchronizeImplementation::
457endSynchronize(IDataSynchronizeBuffer* vs_buf)
458{
459 IParallelMng* pm = m_parallel_mng;
460
461 /*pm->traceMng()->info() << " ** ** COMMON END SYNC n=" << nb_message
462 << " this=" << (IVariableSynchronizeDispatcher*)this
463 << " m_sync_info=" << &this->m_sync_info;*/
464
465 // Waits for receptions to finish
466 pm->waitAllRequests(m_all_requests);
467 m_all_requests.clear();
468
469 // Copies the return message back into the variable.
470 vs_buf->copyAllReceive();
471}
472
473/*---------------------------------------------------------------------------*/
474/*---------------------------------------------------------------------------*/
475
476/*---------------------------------------------------------------------------*/
477/*---------------------------------------------------------------------------*/
478
479IDataSynchronizeMultiDispatcher* IDataSynchronizeMultiDispatcher::
480create(const DataSynchronizeDispatcherBuildInfo& bi)
481{
482 // TODO: Once the old mechanism is removed, the API must be modified not
483 // to use 'VariableCollection' but a list of \a INumericDataInternal
484 if (auto v = Convert::Type<Int32>::tryParseFromEnvironment("ARCANE_USE_LEGACY_MULTISYNCHRONIZE", true))
485 if (v.value() >= 1)
486 return new DataSynchronizeMultiDispatcher(bi);
487 return new DataSynchronizeMultiDispatcherV2(bi);
488}
489
490/*---------------------------------------------------------------------------*/
491/*---------------------------------------------------------------------------*/
492
493} // namespace Arcane
494
495/*---------------------------------------------------------------------------*/
496/*---------------------------------------------------------------------------*/
#define ARCANE_CHECK_POINTER(ptr)
Macro returning the pointer ptr if it is not null or throwing an exception if it is null.
#define ARCANE_FATAL(...)
Macro throwing a FatalErrorException.
#define ARCCORE_DEFINE_REFERENCE_COUNTED_INCLASS_METHODS()
Macro to define methods managing counters of references.
Modifiable view of an array of type T.
Constant view of an array of type T.
static std::optional< Int32 > tryParseFromEnvironment(StringView s, bool throw_if_invalid)
void _compute()
Notifies the implementation that the synchronization information has changed.
Info to build a DataSynchronizeDispatcher.
Manages synchronization for a data item.
DataSynchronizeResult endSynchronize() override
Ends the synchronization.
void setSynchronizeBuffer(Ref< MemoryBuffer > buffer) override
Positions the synchronization buffer.
void compute() override
Recalculates the necessary information after an update to the DataSynchronizeInfo.
SingleDataSynchronizeBuffer m_sync_buffer
Manages send and receive buffers for synchronization.
void beginSynchronize(INumericDataInternal *data, bool is_compare_sync) override
Starts the execution for synchronization for the data data.
void compute() override
Recalculates the necessary information after an update to the DataSynchronizeInfo.
void setSynchronizeBuffer(Ref< MemoryBuffer > buffer) override
Positions the synchronization buffer.
void compute() override
Recalculates the necessary information after an update to the DataSynchronizeInfo.
void setSynchronizeBuffer(Ref< MemoryBuffer >) override
Positions the synchronization buffer.
Information about the result of a synchronization.
Generic buffer for data synchronization.
Interface to manage the synchronization of a data item.
Interface for a generic dispatcher factory.
Interface for synchronizing a list of variables.
Interface for an 'IData' of a numeric type.
virtual MutableMemoryView memoryView()=0
Memory view of the data.
Interface of the parallelism manager for a subdomain.
virtual ITraceMng * traceMng() const =0
Trace manager.
virtual void waitAllRequests(ArrayView< Request > rvalues)=0
Blocks while waiting for the rvalues requests to complete.
IDataSynchronizeBuffer implementation for multiple data items.
Mutable view on a contiguous memory region containing fixed-size elements.
constexpr SpanType bytes() const
View in byte form.
Reference to an instance.
Thread-safe implementation of a reference counter.
IDataSynchronizeBuffer implementation for a single data item.
View of an array of elements of type T.
Definition Span.h:635
1D data vector with value semantics (STL style).
Ref< IParallelExchanger > createExchangerRef(IParallelMng *pm)
Returns an interface to transfer messages between ranks.
-- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature --
Int32 Integer
Type representing an integer.
ConstArrayView< Int32 > Int32ConstArrayView
C equivalent of a 1D array of 32-bit integers.
Definition UtilsTypes.h:482
UniqueArray< Int32 > Int32UniqueArray
Dynamic 1D array of 32-bit integers.
Definition UtilsTypes.h:341
unsigned char Byte
Type of a byte.
Definition BaseTypes.h:43
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Creates a reference on a pointer.
std::int32_t Int32
Signed integer type of 32 bits.