14#include "arcane/impl/internal/VariableSynchronizerMng.h"
16#include "arcane/utils/PlatformUtils.h"
17#include "arcane/utils/ValueConvert.h"
18#include "arcane/utils/FatalErrorException.h"
19#include "arcane/utils/OStringStream.h"
20#include "arcane/utils/internal/MemoryBuffer.h"
22#include "arcane/core/IVariableMng.h"
23#include "arcane/core/IParallelMng.h"
24#include "arcane/core/VariableSynchronizerEventArgs.h"
25#include "arcane/core/IVariable.h"
56class VariableSynchronizerStats
65 static constexpr unsigned char LOCAL_UNKNOWN = 0;
66 static constexpr unsigned char LOCAL_DIFF = 1;
67 static constexpr unsigned char LOCAL_SAME = 2;
78 m_nb_same += x.m_nb_same;
79 m_nb_different += x.m_nb_different;
80 m_nb_unknown += x.m_nb_unknown;
87 Int32 m_nb_different = 0;
88 Int32 m_nb_unknown = 0;
95 , m_variable_synchronizer_mng(vsm)
102 if (m_is_event_registered)
107 m_variable_synchronizer_mng->
onSynchronized().attach(m_observer_pool, handler);
108 m_is_event_registered =
true;
111 void flushPendingStats(IParallelMng* pm);
113 Int32 dumpStats(std::ostream& ostr)
115 std::streamsize old_precision = ostr.precision(20);
116 ostr <<
"Synchronization Stats\n";
117 ostr << Trace::Width(8) <<
"Total"
118 << Trace::Width(8) <<
" Nb "
119 << Trace::Width(8) <<
" Nb "
120 << Trace::Width(8) <<
" Nb "
123 ostr << Trace::Width(8) <<
"Count"
124 << Trace::Width(8) <<
"Same"
125 << Trace::Width(8) <<
"Diff"
126 << Trace::Width(8) <<
"Unknown"
129 for (
const auto& p : m_stats) {
130 total_stat.add(p.second);
131 ostr <<
" " << Trace::Width(7) << p.second.m_count
132 <<
" " << Trace::Width(7) << p.second.m_nb_same
133 <<
" " << Trace::Width(7) << p.second.m_nb_different
134 <<
" " << Trace::Width(7) << p.second.m_nb_unknown
139 ostr <<
" " << Trace::Width(7) << total_stat.m_count
140 <<
" " << Trace::Width(7) << total_stat.m_nb_same
141 <<
" " << Trace::Width(7) << total_stat.m_nb_different
142 <<
" " << Trace::Width(7) << total_stat.m_nb_unknown
146 ostr.precision(old_precision);
147 return total_stat.m_count;
152 VariableSynchronizerMng* m_variable_synchronizer_mng =
nullptr;
153 EventObserverPool m_observer_pool;
154 std::map<String, StatInfo> m_stats;
155 bool m_is_event_registered =
false;
156 UniqueArray<String> m_pending_variable_name_list;
157 UniqueArray<unsigned char> m_pending_compare_status_list;
161 void _handleEvent(
const VariableSynchronizerEventArgs& args);
164void VariableSynchronizerStats::
168 if (args.state() != VariableSynchronizerEventArgs::State::EndSynchronize)
170 if (!m_variable_synchronizer_mng->isDoingStats())
172 Int32 level = m_variable_synchronizer_mng->synchronizationCompareLevel();
173 IParallelMng* pm = m_variable_synchronizer_mng->parallelMng();
174 auto compare_status_list = args.compareStatusList();
177 for (IVariable* var : args.variables()) {
178 m_pending_variable_name_list.add(var->fullName());
180 unsigned char rs = LOCAL_UNKNOWN;
185 m_pending_compare_status_list.add(rs);
191 if (global_rs == LOCAL_SAME) {
192 info() <<
"Synchronize: same values for variable name=" << var->fullName();
204void VariableSynchronizerStats::
207 Int32 nb_pending = m_pending_variable_name_list.size();
209 if (total_nb_pending != nb_pending)
210 ARCANE_FATAL(
"Bad number of pending stats local={0} global={1}", nb_pending, total_nb_pending);
212 for (
Int32 i = 0; i < total_nb_pending; ++i) {
213 unsigned char rs = m_pending_compare_status_list[i];
214 auto& v = m_stats[m_pending_variable_name_list[i]];
215 if (rs == LOCAL_SAME)
217 else if (rs == LOCAL_DIFF)
223 m_pending_variable_name_list.clear();
224 m_pending_compare_status_list.clear();
233VariableSynchronizerMng::
237, m_parallel_mng(vm->parallelMng())
241 m_synchronize_compare_level = v.value();
243 m_is_doing_stats = m_synchronize_compare_level > 0;
246 m_is_doing_stats = (v.value() != 0);
252VariableSynchronizerMng::
253~VariableSynchronizerMng()
261void VariableSynchronizerMng::
273 if (!m_parallel_mng->isParallel())
277 Int32 count = m_stats->dumpStats(ostr2());
281 m_internal_api.dumpStats(ostr);
291 m_stats->flushPendingStats(m_parallel_mng);
306 using MemoryBufferMap = std::map<MemoryBuffer*, Ref<MemoryBuffer>>;
307 using MapList = std::map<IMemoryAllocator*, MemoryBufferMap>;
309 using FreeList = std::map<IMemoryAllocator*, std::stack<Ref<MemoryBuffer>>>;
315 void dumpStats(std::ostream& ostr)
const;
343 std::scoped_lock lock(m_mutex);
345 auto& free_map = m_free_map;
346 auto x = free_map.find(allocator);
349 if (x == free_map.end()) {
351 new_buffer = MemoryBuffer::create(allocator);
354 auto& buffer_stack = x->second;
357 if (buffer_stack.empty()) {
358 new_buffer = MemoryBuffer::create(allocator);
361 new_buffer = buffer_stack.top();
367 m_used_map[allocator].insert(std::make_pair(new_buffer.
get(), new_buffer));
374void VariableSynchronizerMng::InternalApi::BufferList::
375releaseSynchronizeBuffer(IMemoryAllocator* allocator, MemoryBuffer* v)
377 std::scoped_lock lock(m_mutex);
379 auto& main_map = m_used_map;
380 auto x = main_map.find(allocator);
381 if (x == main_map.end())
384 auto& sub_map = x->second;
385 auto x2 = sub_map.find(v);
386 if (x2 == sub_map.end())
389 Ref<MemoryBuffer> ref_memory = x2->second;
393 m_free_map[allocator].push(ref_memory);
402 std::scoped_lock lock(
m_mutex);
406 ostr <<
"SynchronizeBuffer: nb_used_map = " << x.second.size() <<
"\n";
410 ostr <<
"SynchronizeBuffer: nb_free_map = " << x.second.size() <<
"\n";
419VariableSynchronizerMng::InternalApi::
420InternalApi(VariableSynchronizerMng* vms)
422, m_synchronizer_mng(vms)
430VariableSynchronizerMng::InternalApi::
443 return m_buffer_list->createSynchronizeBuffer(allocator);
449void VariableSynchronizerMng::InternalApi::
450releaseSynchronizeBuffer(IMemoryAllocator* allocator, MemoryBuffer* v)
452 m_buffer_list->releaseSynchronizeBuffer(allocator, v);
458void VariableSynchronizerMng::InternalApi::
459dumpStats(std::ostream& ostr)
const
461 m_buffer_list->dumpStats(ostr);
#define ARCANE_FATAL(...)
Macro throwing a FatalErrorException.
static std::optional< Int32 > tryParseFromEnvironment(StringView s, bool throw_if_invalid)
Interface for a memory allocator.
Interface of the parallelism manager for a subdomain.
Variable manager interface.
Management of a memory buffer.
Output stream linked to a String.
InstanceType * get() const
Associated instance or nullptr if none.
Reference to an instance.
TraceAccessor(ITraceMng *m)
Constructs an accessor via the trace manager m.
TraceMessage info() const
Flow for an information message.
ITraceMng * traceMng() const
Trace manager.
Arguments for the event notifying a variable synchronization.
CompareStatus
Comparison of phantom entity values before/after a synchronization.
@ Different
Different values before and after synchronization.
@ Same
Same values before and after synchronization.
Manages a pool of buffers associated with an allocator.
FreeList m_free_map
List of free buffers by allocator.
void dumpStats(std::ostream &ostr) const
MapList m_used_map
List of buffers currently in use by allocator.
std::mutex m_mutex
Mutex to protect buffer creation/retrieval.
Variable synchronizer manager.
EventObservable< const VariableSynchronizerEventArgs & > & onSynchronized() override
Event sent at the beginning and end of synchronization.
void dumpStats(std::ostream &ostr) const override
Prints statistics to the stream ostr.
void flushPendingStats() override
Processes pending statistics.
Synchronization statistics.
@ ReduceMin
Minimum of values.
@ ReduceMax
Maximum of values.
-- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature --
std::int32_t Int32
Signed integer type of 32 bits.