14#include "arcane/impl/internal/VariableSynchronizerMng.h"
16#include "arcane/utils/PlatformUtils.h"
17#include "arcane/utils/ValueConvert.h"
18#include "arcane/utils/FatalErrorException.h"
19#include "arcane/utils/OStringStream.h"
20#include "arcane/utils/internal/MemoryBuffer.h"
22#include "arcane/core/IVariableMng.h"
23#include "arcane/core/IParallelMng.h"
24#include "arcane/core/VariableSynchronizerEventArgs.h"
25#include "arcane/core/IVariable.h"
64 static constexpr unsigned char LOCAL_UNKNOWN = 0;
65 static constexpr unsigned char LOCAL_DIFF = 1;
66 static constexpr unsigned char LOCAL_SAME = 2;
77 m_nb_same += x.m_nb_same;
78 m_nb_different += x.m_nb_different;
79 m_nb_unknown += x.m_nb_unknown;
86 Int32 m_nb_different = 0;
87 Int32 m_nb_unknown = 0;
94 , m_variable_synchronizer_mng(
vsm)
101 if (m_is_event_registered)
106 m_variable_synchronizer_mng->
onSynchronized().attach(m_observer_pool, handler);
107 m_is_event_registered =
true;
110 void flushPendingStats(IParallelMng* pm);
112 Int32 dumpStats(std::ostream& ostr)
114 std::streamsize old_precision = ostr.precision(20);
115 ostr <<
"Synchronization Stats\n";
128 for (
const auto& p : m_stats) {
129 total_stat.add(p.second);
145 ostr.precision(old_precision);
146 return total_stat.m_count;
151 VariableSynchronizerMng* m_variable_synchronizer_mng =
nullptr;
152 EventObserverPool m_observer_pool;
153 std::map<String, StatInfo> m_stats;
154 bool m_is_event_registered =
false;
155 UniqueArray<String> m_pending_variable_name_list;
156 UniqueArray<unsigned char> m_pending_compare_status_list;
160 void _handleEvent(
const VariableSynchronizerEventArgs& args);
163void VariableSynchronizerStats::
164_handleEvent(
const VariableSynchronizerEventArgs& args)
167 if (args.state() != VariableSynchronizerEventArgs::State::EndSynchronize)
169 if (!m_variable_synchronizer_mng->isDoingStats())
172 IParallelMng* pm = m_variable_synchronizer_mng->
parallelMng();
173 auto compare_status_list = args.compareStatusList();
176 for (IVariable* var : args.variables()) {
177 m_pending_variable_name_list.
add(var->fullName());
179 unsigned char rs = LOCAL_UNKNOWN;
184 m_pending_compare_status_list.
add(rs);
189 unsigned char global_rs = pm->
reduce(Parallel::ReduceMax, rs);
190 if (global_rs == LOCAL_SAME) {
191 info() <<
"Synchronize: same values for variable name=" << var->fullName();
193 info() <<
"Stack=" << platform::getStackTrace();
203void VariableSynchronizerStats::
204flushPendingStats(IParallelMng* pm)
206 Int32 nb_pending = m_pending_variable_name_list.
size();
207 Int32 total_nb_pending = pm->
reduce(Parallel::ReduceMax, nb_pending);
208 if (total_nb_pending != nb_pending)
209 ARCANE_FATAL(
"Bad number of pending stats local={0} global={1}", nb_pending, total_nb_pending);
210 pm->
reduce(Parallel::ReduceMin, m_pending_compare_status_list);
211 for (
Int32 i = 0; i < total_nb_pending; ++i) {
212 unsigned char rs = m_pending_compare_status_list[i];
213 auto& v = m_stats[m_pending_variable_name_list[i]];
214 if (rs == LOCAL_SAME)
216 else if (rs == LOCAL_DIFF)
222 m_pending_variable_name_list.clear();
223 m_pending_compare_status_list.
clear();
232VariableSynchronizerMng::
233VariableSynchronizerMng(IVariableMng* vm)
234: TraceAccessor(vm->traceMng())
236, m_parallel_mng(vm->parallelMng())
237, m_stats(new VariableSynchronizerStats(this))
239 if (
auto v = Convert::Type<Int32>::tryParseFromEnvironment(
"ARCANE_AUTO_COMPARE_SYNCHRONIZE",
true)) {
240 m_synchronize_compare_level = v.value();
242 m_is_doing_stats = m_synchronize_compare_level > 0;
244 if (
auto v = Convert::Type<Int32>::tryParseFromEnvironment(
"ARCANE_SYNCHRONIZE_STATS",
true))
245 m_is_doing_stats = (v.value() != 0);
251VariableSynchronizerMng::
252~VariableSynchronizerMng()
260void VariableSynchronizerMng::
272 if (!m_parallel_mng->isParallel())
276 Int32 count = m_stats->dumpStats(
ostr2());
280 m_internal_api.dumpStats(
ostr);
290 m_stats->flushPendingStats(m_parallel_mng);
302 using MemoryBufferMap = std::map<MemoryBuffer*, Ref<MemoryBuffer>>;
303 using MapList = std::map<IMemoryAllocator*, MemoryBufferMap>;
305 using FreeList = std::map<IMemoryAllocator*, std::stack<Ref<MemoryBuffer>>>;
319VariableSynchronizerMng::InternalApi::
322, m_synchronizer_mng(
vms)
330VariableSynchronizerMng::InternalApi::
333 delete m_buffer_list;
358 auto& buffer_stack = x->second;
361 if (buffer_stack.empty()) {
362 new_buffer = MemoryBuffer::create(allocator);
365 new_buffer = buffer_stack.top();
371 m_buffer_list->
m_used_map[allocator].insert(std::make_pair(new_buffer.get(), new_buffer));
378void VariableSynchronizerMng::InternalApi::
379releaseSynchronizeBuffer(IMemoryAllocator* allocator,
MemoryBuffer* v)
382 auto x = main_map.find(allocator);
383 if (x == main_map.end())
386 auto& sub_map = x->second;
387 auto x2 = sub_map.find(v);
388 if (x2 == sub_map.end())
391 Ref<MemoryBuffer> ref_memory = x2->second;
395 m_buffer_list->
m_free_map[allocator].push(ref_memory);
405 for (
const auto& x : m_buffer_list->
m_used_map)
406 ostr <<
"SynchronizeBuffer: nb_used_map = " << x.second.size() <<
"\n";
409 for (
const auto& x : m_buffer_list->
m_free_map)
410 ostr <<
"SynchronizeBuffer: nb_free_map = " << x.second.size() <<
"\n";
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
virtual char reduce(eReduceType rt, char v)=0
Effectue la réduction de type rt sur le réel v et retourne la valeur.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Flot de sortie lié à une String.
Arguments de l'évènement notifiant une synchronisation de variables.
CompareStatus
Comparaison des valeurs des entités fantômes avant/après une synchronisation.
@ Different
Valeurs différentes avant et après la synchronisation.
@ Same
Même valeurs avant et après la synchronisation.
Gère un pool de buffer associé à un allocateur.
FreeList m_free_map
Liste par allocateur des buffers libres.
MapList m_used_map
Liste par allocateur des buffers en cours d'utilisation.
void dumpStats(std::ostream &ostr) const
Gestionnaire des synchroniseurs de variables.
EventObservable< const VariableSynchronizerEventArgs & > & onSynchronized() override
Évènement envoyé en début et fin de synchronisation.
Int32 synchronizationCompareLevel() const final
Niveau de comparaison des valeurs avant et après synchronisation.
void dumpStats(std::ostream &ostr) const override
Affiche les statistiques sur le flot ostr.
void flushPendingStats() override
Traite les statistiques en cours.
IParallelMng * parallelMng() const override
Gestionnaire de parallélisme associé
Statistiques de synchronisation.
Integer size() const
Nombre d'éléments du vecteur.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
void clear()
Supprime les éléments du tableau.
Interface d'un allocateur pour la mémoire.
Classe d'accès aux traces.
ITraceMng * traceMng() const
Gestionnaire de trace.
TraceMessage info() const
Flot pour un message d'information.
Formattage du flot en longueur.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
Represents an in-memory output byte stream.