Arcane  v3.14.10.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
VariableSynchronizerMng.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2023 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* VariableSynchronizerMng.cc (C) 2000-2023 */
9/* */
10/* Gestionnaire des synchroniseurs de variables. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/impl/internal/VariableSynchronizerMng.h"
15
16#include "arcane/utils/PlatformUtils.h"
17#include "arcane/utils/ValueConvert.h"
18#include "arcane/utils/FatalErrorException.h"
19#include "arcane/utils/OStringStream.h"
20#include "arcane/utils/internal/MemoryBuffer.h"
21
22#include "arcane/core/IVariableMng.h"
23#include "arcane/core/IParallelMng.h"
24#include "arcane/core/VariableSynchronizerEventArgs.h"
25#include "arcane/core/IVariable.h"
26
27#include <map>
28#include <stack>
29
30/*---------------------------------------------------------------------------*/
31/*---------------------------------------------------------------------------*/
32
33namespace Arcane
34{
35
36/*---------------------------------------------------------------------------*/
37/*---------------------------------------------------------------------------*/
55: public TraceAccessor
56{
57 public:
58
59 // On utilise un ReduceMin pour la valeur de comparaison.
60 // Pour qu'on considère comme identique, il faut que tout les rangs
61 // soient identiques. Il suffit d'un rang 'Unknown' pour considérer
62 // que c'est 'Unknown'. Il faut donc que 'Unknown' soit la valeur la
63 // plus faible et 'Same' la plus élevée.
64 static constexpr unsigned char LOCAL_UNKNOWN = 0;
65 static constexpr unsigned char LOCAL_DIFF = 1;
66 static constexpr unsigned char LOCAL_SAME = 2;
67
68 public:
69
71 {
72 public:
73
74 void add(const StatInfo& x)
75 {
76 m_count += x.m_count;
77 m_nb_same += x.m_nb_same;
78 m_nb_different += x.m_nb_different;
79 m_nb_unknown += x.m_nb_unknown;
80 }
81
82 public:
83
84 Int32 m_count = 0;
85 Int32 m_nb_same = 0;
86 Int32 m_nb_different = 0;
87 Int32 m_nb_unknown = 0;
88 };
89
90 public:
91
94 , m_variable_synchronizer_mng(vsm)
95 {}
96
97 public:
98
99 void init()
100 {
101 if (m_is_event_registered)
102 ARCANE_FATAL("instance is already initialized.");
103 auto handler = [&](const VariableSynchronizerEventArgs& args) {
104 _handleEvent(args);
105 };
106 m_variable_synchronizer_mng->onSynchronized().attach(m_observer_pool, handler);
107 m_is_event_registered = true;
108 }
109
110 void flushPendingStats(IParallelMng* pm);
111
112 Int32 dumpStats(std::ostream& ostr)
113 {
114 std::streamsize old_precision = ostr.precision(20);
115 ostr << "Synchronization Stats\n";
116 ostr << Trace::Width(8) << "Total"
117 << Trace::Width(8) << " Nb "
118 << Trace::Width(8) << " Nb "
119 << Trace::Width(8) << " Nb "
120 << " Variable name"
121 << "\n";
122 ostr << Trace::Width(8) << "Count"
123 << Trace::Width(8) << "Same"
124 << Trace::Width(8) << "Diff"
125 << Trace::Width(8) << "Unknown"
126 << "\n";
127 StatInfo total_stat;
128 for (const auto& p : m_stats) {
129 total_stat.add(p.second);
130 ostr << " " << Trace::Width(7) << p.second.m_count
131 << " " << Trace::Width(7) << p.second.m_nb_same
132 << " " << Trace::Width(7) << p.second.m_nb_different
133 << " " << Trace::Width(7) << p.second.m_nb_unknown
134 << " " << p.first
135 << "\n";
136 }
137 ostr << "\n";
138 ostr << " " << Trace::Width(7) << total_stat.m_count
139 << " " << Trace::Width(7) << total_stat.m_nb_same
140 << " " << Trace::Width(7) << total_stat.m_nb_different
141 << " " << Trace::Width(7) << total_stat.m_nb_unknown
142 << " "
143 << "TOTAL"
144 << "\n\n";
145 ostr.precision(old_precision);
146 return total_stat.m_count;
147 }
148
149 private:
150
151 VariableSynchronizerMng* m_variable_synchronizer_mng = nullptr;
152 EventObserverPool m_observer_pool;
153 std::map<String, StatInfo> m_stats;
154 bool m_is_event_registered = false;
155 UniqueArray<String> m_pending_variable_name_list;
156 UniqueArray<unsigned char> m_pending_compare_status_list;
157
158 private:
159
160 void _handleEvent(const VariableSynchronizerEventArgs& args);
161};
162
163void VariableSynchronizerStats::
164_handleEvent(const VariableSynchronizerEventArgs& args)
165{
166 // On ne traite que les évènements de fin de synchronisation
167 if (args.state() != VariableSynchronizerEventArgs::State::EndSynchronize)
168 return;
169 if (!m_variable_synchronizer_mng->isDoingStats())
170 return;
171 Int32 level = m_variable_synchronizer_mng->synchronizationCompareLevel();
172 IParallelMng* pm = m_variable_synchronizer_mng->parallelMng();
173 auto compare_status_list = args.compareStatusList();
174 {
175 Int32 index = 0;
176 for (IVariable* var : args.variables()) {
177 m_pending_variable_name_list.add(var->fullName());
178 VariableSynchronizerEventArgs::CompareStatus s = compare_status_list[index];
179 unsigned char rs = LOCAL_UNKNOWN; // Compare == Unknown;
181 rs = LOCAL_SAME;
183 rs = LOCAL_DIFF;
184 m_pending_compare_status_list.add(rs);
185 ++index;
186 if (level >= 2) {
187 // On fait la réduction ici car on veut savoir immédiatement s'il y a une
188 // différence.
189 unsigned char global_rs = pm->reduce(Parallel::ReduceMax, rs);
190 if (global_rs == LOCAL_SAME) {
191 info() << "Synchronize: same values for variable name=" << var->fullName();
192 if (level >= 3)
193 info() << "Stack=" << platform::getStackTrace();
194 }
195 }
196 }
197 }
198}
199
200/*---------------------------------------------------------------------------*/
201/*---------------------------------------------------------------------------*/
202
203void VariableSynchronizerStats::
204flushPendingStats(IParallelMng* pm)
205{
206 Int32 nb_pending = m_pending_variable_name_list.size();
207 Int32 total_nb_pending = pm->reduce(Parallel::ReduceMax, nb_pending);
208 if (total_nb_pending != nb_pending)
209 ARCANE_FATAL("Bad number of pending stats local={0} global={1}", nb_pending, total_nb_pending);
210 pm->reduce(Parallel::ReduceMin, m_pending_compare_status_list);
211 for (Int32 i = 0; i < total_nb_pending; ++i) {
212 unsigned char rs = m_pending_compare_status_list[i];
213 auto& v = m_stats[m_pending_variable_name_list[i]];
214 if (rs == LOCAL_SAME)
215 ++v.m_nb_same;
216 else if (rs == LOCAL_DIFF)
217 ++v.m_nb_different;
218 else
219 ++v.m_nb_unknown;
220 ++v.m_count;
221 }
222 m_pending_variable_name_list.clear();
223 m_pending_compare_status_list.clear();
224}
225
226/*---------------------------------------------------------------------------*/
227/*---------------------------------------------------------------------------*/
228
229/*---------------------------------------------------------------------------*/
230/*---------------------------------------------------------------------------*/
231
232VariableSynchronizerMng::
233VariableSynchronizerMng(IVariableMng* vm)
234: TraceAccessor(vm->traceMng())
235, m_variable_mng(vm)
236, m_parallel_mng(vm->parallelMng())
237, m_stats(new VariableSynchronizerStats(this))
238{
239 if (auto v = Convert::Type<Int32>::tryParseFromEnvironment("ARCANE_AUTO_COMPARE_SYNCHRONIZE", true)) {
240 m_synchronize_compare_level = v.value();
241 // Si on active la comparaison, on active aussi les statistiques
242 m_is_doing_stats = m_synchronize_compare_level > 0;
243 }
244 if (auto v = Convert::Type<Int32>::tryParseFromEnvironment("ARCANE_SYNCHRONIZE_STATS", true))
245 m_is_doing_stats = (v.value() != 0);
246}
247
248/*---------------------------------------------------------------------------*/
249/*---------------------------------------------------------------------------*/
250
251VariableSynchronizerMng::
252~VariableSynchronizerMng()
253{
254 delete m_stats;
255}
256
257/*---------------------------------------------------------------------------*/
258/*---------------------------------------------------------------------------*/
259
260void VariableSynchronizerMng::
261initialize()
262{
263 m_stats->init();
264}
265
266/*---------------------------------------------------------------------------*/
267/*---------------------------------------------------------------------------*/
268
270dumpStats(std::ostream& ostr) const
271{
272 if (!m_parallel_mng->isParallel())
273 return;
274 {
276 Int32 count = m_stats->dumpStats(ostr2());
277 if (count > 0)
278 ostr << ostr2.str();
279 }
280 m_internal_api.dumpStats(ostr);
281}
282
283/*---------------------------------------------------------------------------*/
284/*---------------------------------------------------------------------------*/
285
288{
289 if (isDoingStats())
290 m_stats->flushPendingStats(m_parallel_mng);
291}
292
293/*---------------------------------------------------------------------------*/
294/*---------------------------------------------------------------------------*/
299{
300 public:
301
302 using MemoryBufferMap = std::map<MemoryBuffer*, Ref<MemoryBuffer>>;
303 using MapList = std::map<IMemoryAllocator*, MemoryBufferMap>;
304
305 using FreeList = std::map<IMemoryAllocator*, std::stack<Ref<MemoryBuffer>>>;
306
307 public:
308
310 MapList m_used_map;
311
313 FreeList m_free_map;
314};
315
316/*---------------------------------------------------------------------------*/
317/*---------------------------------------------------------------------------*/
318
319VariableSynchronizerMng::InternalApi::
320InternalApi(VariableSynchronizerMng* vms)
322, m_synchronizer_mng(vms)
323, m_buffer_list(new BufferList())
324{
325}
326
327/*---------------------------------------------------------------------------*/
328/*---------------------------------------------------------------------------*/
329
330VariableSynchronizerMng::InternalApi::
331~InternalApi()
332{
333 delete m_buffer_list;
334}
335
336/*---------------------------------------------------------------------------*/
337/*---------------------------------------------------------------------------*/
338/*
339 * \brief Créé ou récupère un buffer.
340 *
341 * Il est possible de créer des buffers avec un allocateur nul. Dans ce
342 * cas ce sera l'allocateur par défaut qui sera utilisé et donc pour
343 * un MemoryBuffer donné, on n'aura pas forcément new_buffer.allocator()==allocator.
344 * Il ne faut donc toujours utiliser \a allocator.
345 */
346Ref<MemoryBuffer> VariableSynchronizerMng::InternalApi::
347createSynchronizeBuffer(IMemoryAllocator* allocator)
348{
349 auto& free_map = m_buffer_list->m_free_map;
350 auto x = free_map.find(allocator);
352 // Regarde si un buffer est disponible dans \a free_map.
353 if (x == free_map.end()) {
354 // Aucune buffer associé à cet allocator, on en créé un
355 new_buffer = MemoryBuffer::create(allocator);
356 }
357 else {
358 auto& buffer_stack = x->second;
359 // Si la pile est vide, on créé un buffer. Sinon on prend le premier
360 // de la pile.
361 if (buffer_stack.empty()) {
362 new_buffer = MemoryBuffer::create(allocator);
363 }
364 else {
365 new_buffer = buffer_stack.top();
366 buffer_stack.pop();
367 }
368 }
369
370 // Enregistre l'instance dans la liste utilisée
371 m_buffer_list->m_used_map[allocator].insert(std::make_pair(new_buffer.get(), new_buffer));
372 return new_buffer;
373}
374
375/*---------------------------------------------------------------------------*/
376/*---------------------------------------------------------------------------*/
377
378void VariableSynchronizerMng::InternalApi::
379releaseSynchronizeBuffer(IMemoryAllocator* allocator, MemoryBuffer* v)
380{
381 auto& main_map = m_buffer_list->m_used_map;
382 auto x = main_map.find(allocator);
383 if (x == main_map.end())
384 ARCANE_FATAL("Invalid allocator '{0}'", allocator);
385
386 auto& sub_map = x->second;
387 auto x2 = sub_map.find(v);
388 if (x2 == sub_map.end())
389 ARCANE_FATAL("Invalid buffer '{0}'", v);
390
391 Ref<MemoryBuffer> ref_memory = x2->second;
392
393 sub_map.erase(x2);
394
395 m_buffer_list->m_free_map[allocator].push(ref_memory);
396}
397
398/*---------------------------------------------------------------------------*/
399/*---------------------------------------------------------------------------*/
400
402dumpStats(std::ostream& ostr) const
403{
405 for (const auto& x : m_buffer_list->m_used_map)
406 ostr << "SynchronizeBuffer: nb_used_map = " << x.second.size() << "\n";
407
409 for (const auto& x : m_buffer_list->m_free_map)
410 ostr << "SynchronizeBuffer: nb_free_map = " << x.second.size() << "\n";
411}
412
413/*---------------------------------------------------------------------------*/
414/*---------------------------------------------------------------------------*/
415
416} // End namespace Arcane
417
418/*---------------------------------------------------------------------------*/
419/*---------------------------------------------------------------------------*/
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
virtual char reduce(eReduceType rt, char v)=0
Effectue la réduction de type rt sur le réel v et retourne la valeur.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Definition Lima.cc:120
Flot de sortie lié à une String.
Arguments de l'évènement notifiant une synchronisation de variables.
CompareStatus
Comparaison des valeurs des entités fantômes avant/après une synchronisation.
@ Different
Valeurs différentes avant et après la synchronisation.
@ Same
Même valeurs avant et après la synchronisation.
Gère un pool de buffer associé à un allocateur.
FreeList m_free_map
Liste par allocateur des buffers libres.
MapList m_used_map
Liste par allocateur des buffers en cours d'utilisation.
Gestionnaire des synchroniseurs de variables.
EventObservable< const VariableSynchronizerEventArgs & > & onSynchronized() override
Évènement envoyé en début et fin de synchronisation.
Int32 synchronizationCompareLevel() const final
Niveau de comparaison des valeurs avant et après synchronisation.
void dumpStats(std::ostream &ostr) const override
Affiche les statistiques sur le flot ostr.
void flushPendingStats() override
Traite les statistiques en cours.
IParallelMng * parallelMng() const override
Gestionnaire de parallélisme associé
Statistiques de synchronisation.
Integer size() const
Nombre d'éléments du vecteur.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
void clear()
Supprime les éléments du tableau.
ITraceMng * traceMng() const
Gestionnaire de trace.
TraceMessage info() const
Flot pour un message d'information.
Formattage du flot en longueur.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
Represents an in-memory output byte stream.