Arcane  v3.16.6.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
VariableSynchronizerMng.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* VariableSynchronizerMng.cc (C) 2000-2025 */
9/* */
10/* Gestionnaire des synchroniseurs de variables. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/impl/internal/VariableSynchronizerMng.h"
15
16#include "arcane/utils/PlatformUtils.h"
17#include "arcane/utils/ValueConvert.h"
18#include "arcane/utils/FatalErrorException.h"
19#include "arcane/utils/OStringStream.h"
20#include "arcane/utils/internal/MemoryBuffer.h"
21
22#include "arcane/core/IVariableMng.h"
23#include "arcane/core/IParallelMng.h"
24#include "arcane/core/VariableSynchronizerEventArgs.h"
25#include "arcane/core/IVariable.h"
26
27#include <map>
28#include <mutex>
29#include <stack>
30
31/*---------------------------------------------------------------------------*/
32/*---------------------------------------------------------------------------*/
33
34namespace Arcane
35{
36
37/*---------------------------------------------------------------------------*/
38/*---------------------------------------------------------------------------*/
55class VariableSynchronizerStats
56: public TraceAccessor
57{
58 public:
59
60 // On utilise un ReduceMin pour la valeur de comparaison.
61 // Pour qu'on considère comme identique, il faut que tout les rangs
62 // soient identiques. Il suffit d'un rang 'Unknown' pour considérer
63 // que c'est 'Unknown'. Il faut donc que 'Unknown' soit la valeur la
64 // plus faible et 'Same' la plus élevée.
65 static constexpr unsigned char LOCAL_UNKNOWN = 0;
66 static constexpr unsigned char LOCAL_DIFF = 1;
67 static constexpr unsigned char LOCAL_SAME = 2;
68
69 public:
70
72 {
73 public:
74
75 void add(const StatInfo& x)
76 {
77 m_count += x.m_count;
78 m_nb_same += x.m_nb_same;
79 m_nb_different += x.m_nb_different;
80 m_nb_unknown += x.m_nb_unknown;
81 }
82
83 public:
84
85 Int32 m_count = 0;
86 Int32 m_nb_same = 0;
87 Int32 m_nb_different = 0;
88 Int32 m_nb_unknown = 0;
89 };
90
91 public:
92
93 explicit VariableSynchronizerStats(VariableSynchronizerMng* vsm)
94 : TraceAccessor(vsm->traceMng())
95 , m_variable_synchronizer_mng(vsm)
96 {}
97
98 public:
99
100 void init()
101 {
102 if (m_is_event_registered)
103 ARCANE_FATAL("instance is already initialized.");
104 auto handler = [&](const VariableSynchronizerEventArgs& args) {
105 _handleEvent(args);
106 };
107 m_variable_synchronizer_mng->onSynchronized().attach(m_observer_pool, handler);
108 m_is_event_registered = true;
109 }
110
111 void flushPendingStats(IParallelMng* pm);
112
113 Int32 dumpStats(std::ostream& ostr)
114 {
115 std::streamsize old_precision = ostr.precision(20);
116 ostr << "Synchronization Stats\n";
117 ostr << Trace::Width(8) << "Total"
118 << Trace::Width(8) << " Nb "
119 << Trace::Width(8) << " Nb "
120 << Trace::Width(8) << " Nb "
121 << " Variable name"
122 << "\n";
123 ostr << Trace::Width(8) << "Count"
124 << Trace::Width(8) << "Same"
125 << Trace::Width(8) << "Diff"
126 << Trace::Width(8) << "Unknown"
127 << "\n";
128 StatInfo total_stat;
129 for (const auto& p : m_stats) {
130 total_stat.add(p.second);
131 ostr << " " << Trace::Width(7) << p.second.m_count
132 << " " << Trace::Width(7) << p.second.m_nb_same
133 << " " << Trace::Width(7) << p.second.m_nb_different
134 << " " << Trace::Width(7) << p.second.m_nb_unknown
135 << " " << p.first
136 << "\n";
137 }
138 ostr << "\n";
139 ostr << " " << Trace::Width(7) << total_stat.m_count
140 << " " << Trace::Width(7) << total_stat.m_nb_same
141 << " " << Trace::Width(7) << total_stat.m_nb_different
142 << " " << Trace::Width(7) << total_stat.m_nb_unknown
143 << " "
144 << "TOTAL"
145 << "\n\n";
146 ostr.precision(old_precision);
147 return total_stat.m_count;
148 }
149
150 private:
151
152 VariableSynchronizerMng* m_variable_synchronizer_mng = nullptr;
153 EventObserverPool m_observer_pool;
154 std::map<String, StatInfo> m_stats;
155 bool m_is_event_registered = false;
156 UniqueArray<String> m_pending_variable_name_list;
157 UniqueArray<unsigned char> m_pending_compare_status_list;
158
159 private:
160
161 void _handleEvent(const VariableSynchronizerEventArgs& args);
162};
163
164void VariableSynchronizerStats::
165_handleEvent(const VariableSynchronizerEventArgs& args)
166{
167 // On ne traite que les évènements de fin de synchronisation
168 if (args.state() != VariableSynchronizerEventArgs::State::EndSynchronize)
169 return;
170 if (!m_variable_synchronizer_mng->isDoingStats())
171 return;
172 Int32 level = m_variable_synchronizer_mng->synchronizationCompareLevel();
173 IParallelMng* pm = m_variable_synchronizer_mng->parallelMng();
174 auto compare_status_list = args.compareStatusList();
175 {
176 Int32 index = 0;
177 for (IVariable* var : args.variables()) {
178 m_pending_variable_name_list.add(var->fullName());
179 VariableSynchronizerEventArgs::CompareStatus s = compare_status_list[index];
180 unsigned char rs = LOCAL_UNKNOWN; // Compare == Unknown;
182 rs = LOCAL_SAME;
184 rs = LOCAL_DIFF;
185 m_pending_compare_status_list.add(rs);
186 ++index;
187 if (level >= 2) {
188 // On fait la réduction ici car on veut savoir immédiatement s'il y a une
189 // différence.
190 unsigned char global_rs = pm->reduce(Parallel::ReduceMax, rs);
191 if (global_rs == LOCAL_SAME) {
192 info() << "Synchronize: same values for variable name=" << var->fullName();
193 if (level >= 3)
194 info() << "Stack=" << platform::getStackTrace();
195 }
196 }
197 }
198 }
199}
200
201/*---------------------------------------------------------------------------*/
202/*---------------------------------------------------------------------------*/
203
204void VariableSynchronizerStats::
205flushPendingStats(IParallelMng* pm)
206{
207 Int32 nb_pending = m_pending_variable_name_list.size();
208 Int32 total_nb_pending = pm->reduce(Parallel::ReduceMax, nb_pending);
209 if (total_nb_pending != nb_pending)
210 ARCANE_FATAL("Bad number of pending stats local={0} global={1}", nb_pending, total_nb_pending);
211 pm->reduce(Parallel::ReduceMin, m_pending_compare_status_list);
212 for (Int32 i = 0; i < total_nb_pending; ++i) {
213 unsigned char rs = m_pending_compare_status_list[i];
214 auto& v = m_stats[m_pending_variable_name_list[i]];
215 if (rs == LOCAL_SAME)
216 ++v.m_nb_same;
217 else if (rs == LOCAL_DIFF)
218 ++v.m_nb_different;
219 else
220 ++v.m_nb_unknown;
221 ++v.m_count;
222 }
223 m_pending_variable_name_list.clear();
224 m_pending_compare_status_list.clear();
225}
226
227/*---------------------------------------------------------------------------*/
228/*---------------------------------------------------------------------------*/
229
230/*---------------------------------------------------------------------------*/
231/*---------------------------------------------------------------------------*/
232
233VariableSynchronizerMng::
234VariableSynchronizerMng(IVariableMng* vm)
235: TraceAccessor(vm->traceMng())
236, m_variable_mng(vm)
237, m_parallel_mng(vm->parallelMng())
238, m_stats(new VariableSynchronizerStats(this))
239{
240 if (auto v = Convert::Type<Int32>::tryParseFromEnvironment("ARCANE_AUTO_COMPARE_SYNCHRONIZE", true)) {
241 m_synchronize_compare_level = v.value();
242 // Si on active la comparaison, on active aussi les statistiques
243 m_is_doing_stats = m_synchronize_compare_level > 0;
244 }
245 if (auto v = Convert::Type<Int32>::tryParseFromEnvironment("ARCANE_SYNCHRONIZE_STATS", true))
246 m_is_doing_stats = (v.value() != 0);
247}
248
249/*---------------------------------------------------------------------------*/
250/*---------------------------------------------------------------------------*/
251
252VariableSynchronizerMng::
253~VariableSynchronizerMng()
254{
255 delete m_stats;
256}
257
258/*---------------------------------------------------------------------------*/
259/*---------------------------------------------------------------------------*/
260
261void VariableSynchronizerMng::
262initialize()
263{
264 m_stats->init();
265}
266
267/*---------------------------------------------------------------------------*/
268/*---------------------------------------------------------------------------*/
269
271dumpStats(std::ostream& ostr) const
272{
273 if (!m_parallel_mng->isParallel())
274 return;
275 {
276 OStringStream ostr2;
277 Int32 count = m_stats->dumpStats(ostr2());
278 if (count > 0)
279 ostr << ostr2.str();
280 }
281 m_internal_api.dumpStats(ostr);
282}
283
284/*---------------------------------------------------------------------------*/
285/*---------------------------------------------------------------------------*/
286
289{
290 if (isDoingStats())
291 m_stats->flushPendingStats(m_parallel_mng);
292}
293
294/*---------------------------------------------------------------------------*/
295/*---------------------------------------------------------------------------*/
302{
303 public:
304
305 using MemoryBufferMap = std::map<MemoryBuffer*, Ref<MemoryBuffer>>;
306 using MapList = std::map<IMemoryAllocator*, MemoryBufferMap>;
307
308 using FreeList = std::map<IMemoryAllocator*, std::stack<Ref<MemoryBuffer>>>;
309
310 public:
311
312 Ref<MemoryBuffer> createSynchronizeBuffer(IMemoryAllocator* allocator);
313 void releaseSynchronizeBuffer(IMemoryAllocator* allocator, MemoryBuffer* v);
314 void dumpStats(std::ostream& ostr) const;
315
316 private:
317
319 MapList m_used_map;
320
322 FreeList m_free_map;
323
325 mutable std::mutex m_mutex;
326};
327
328/*---------------------------------------------------------------------------*/
329/*---------------------------------------------------------------------------*/
330/*
331 * \brief Créé ou récupère un buffer.
332 *
333 * Il est possible de créer des buffers avec un allocateur nul. Dans ce
334 * cas, ce sera l'allocateur par défaut qui sera utilisé et donc pour
335 * un MemoryBuffer donné, on n'aura pas forcément new_buffer.allocator()==allocator.
336 * Il faut donc toujours utiliser \a allocator.
337 */
338Ref<MemoryBuffer> VariableSynchronizerMng::InternalApi::BufferList::
339createSynchronizeBuffer(IMemoryAllocator* allocator)
340{
341 std::scoped_lock lock(m_mutex);
342
343 auto& free_map = m_free_map;
344 auto x = free_map.find(allocator);
345 Ref<MemoryBuffer> new_buffer;
346 // Regarde si un buffer est disponible dans \a free_map.
347 if (x == free_map.end()) {
348 // Aucun buffer associé à cet allocator, on en crée un
349 new_buffer = MemoryBuffer::create(allocator);
350 }
351 else {
352 auto& buffer_stack = x->second;
353 // Si la pile est vide, on crée un buffer. Sinon, on prend le premier
354 // de la pile.
355 if (buffer_stack.empty()) {
356 new_buffer = MemoryBuffer::create(allocator);
357 }
358 else {
359 new_buffer = buffer_stack.top();
360 buffer_stack.pop();
361 }
362 }
363
364 // Enregistre l'instance dans la liste utilisée
365 m_used_map[allocator].insert(std::make_pair(new_buffer.get(), new_buffer));
366 return new_buffer;
367}
368
369/*---------------------------------------------------------------------------*/
370/*---------------------------------------------------------------------------*/
371
372void VariableSynchronizerMng::InternalApi::BufferList::
373releaseSynchronizeBuffer(IMemoryAllocator* allocator, MemoryBuffer* v)
374{
375 std::scoped_lock lock(m_mutex);
376
377 auto& main_map = m_used_map;
378 auto x = main_map.find(allocator);
379 if (x == main_map.end())
380 ARCANE_FATAL("Invalid allocator '{0}'", allocator);
381
382 auto& sub_map = x->second;
383 auto x2 = sub_map.find(v);
384 if (x2 == sub_map.end())
385 ARCANE_FATAL("Invalid buffer '{0}'", v);
386
387 Ref<MemoryBuffer> ref_memory = x2->second;
388
389 sub_map.erase(x2);
390
391 m_free_map[allocator].push(ref_memory);
392}
393
394/*---------------------------------------------------------------------------*/
395/*---------------------------------------------------------------------------*/
396
398dumpStats(std::ostream& ostr) const
399{
400 std::scoped_lock lock(m_mutex);
401
403 for (const auto& x : m_used_map)
404 ostr << "SynchronizeBuffer: nb_used_map = " << x.second.size() << "\n";
405
407 for (const auto& x : m_free_map)
408 ostr << "SynchronizeBuffer: nb_free_map = " << x.second.size() << "\n";
409}
410
411/*---------------------------------------------------------------------------*/
412/*---------------------------------------------------------------------------*/
413
414/*---------------------------------------------------------------------------*/
415/*---------------------------------------------------------------------------*/
416
417VariableSynchronizerMng::InternalApi::
418InternalApi(VariableSynchronizerMng* vms)
419: TraceAccessor(vms->traceMng())
420, m_synchronizer_mng(vms)
421, m_buffer_list(new BufferList())
422{
423}
424
425/*---------------------------------------------------------------------------*/
426/*---------------------------------------------------------------------------*/
427
428VariableSynchronizerMng::InternalApi::
429~InternalApi()
430{
431 // Le destructeur ne peut pas être supprimé car 'm_buffer_list' n'est pas
432 // connu lors de la définition de la classe.
433}
434
435/*---------------------------------------------------------------------------*/
436/*---------------------------------------------------------------------------*/
437
438Ref<MemoryBuffer> VariableSynchronizerMng::InternalApi::
439createSynchronizeBuffer(IMemoryAllocator* allocator)
440{
441 return m_buffer_list->createSynchronizeBuffer(allocator);
442}
443
444/*---------------------------------------------------------------------------*/
445/*---------------------------------------------------------------------------*/
446
447void VariableSynchronizerMng::InternalApi::
448releaseSynchronizeBuffer(IMemoryAllocator* allocator, MemoryBuffer* v)
449{
450 m_buffer_list->releaseSynchronizeBuffer(allocator, v);
451}
452
453/*---------------------------------------------------------------------------*/
454/*---------------------------------------------------------------------------*/
455
456void VariableSynchronizerMng::InternalApi::
457dumpStats(std::ostream& ostr) const
458{
459 m_buffer_list->dumpStats(ostr);
460}
461
462/*---------------------------------------------------------------------------*/
463/*---------------------------------------------------------------------------*/
464
465} // End namespace Arcane
466
467/*---------------------------------------------------------------------------*/
468/*---------------------------------------------------------------------------*/
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
static std::optional< Int32 > tryParseFromEnvironment(StringView s, bool throw_if_invalid)
Definition Convert.cc:122
Interface du gestionnaire de parallélisme pour un sous-domaine.
Interface du gestionnaire de variables.
Gestion d'un buffer mémoire.
Flot de sortie lié à une String.
Référence à une instance.
InstanceType * get() const
Instance associée ou nullptr si aucune.
TraceAccessor(ITraceMng *m)
Construit un accesseur via le gestionnaire de trace m.
TraceMessage info() const
Flot pour un message d'information.
ITraceMng * traceMng() const
Gestionnaire de trace.
Arguments de l'évènement notifiant une synchronisation de variables.
CompareStatus
Comparaison des valeurs des entités fantômes avant/après une synchronisation.
@ Different
Valeurs différentes avant et après la synchronisation.
@ Same
Même valeurs avant et après la synchronisation.
Gère un pool de buffer associé à un allocateur.
FreeList m_free_map
Liste par allocateur des buffers libres.
MapList m_used_map
Liste par allocateur des buffers en cours d'utilisation.
std::mutex m_mutex
Mutex pour protéger la création/récupération des buffers.
Gestionnaire des synchroniseurs de variables.
EventObservable< const VariableSynchronizerEventArgs & > & onSynchronized() override
Évènement envoyé en début et fin de synchronisation.
void dumpStats(std::ostream &ostr) const override
Affiche les statistiques sur le flot ostr.
void flushPendingStats() override
Traite les statistiques en cours.
Statistiques de synchronisation.
@ ReduceMin
Minimum des valeurs.
@ ReduceMax
Maximum des valeurs.
ARCCORE_BASE_EXPORT String getStackTrace()
Retourne une chaîne de caractere contenant la pile d'appel.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
std::int32_t Int32
Type entier signé sur 32 bits.