Arcane  v3.15.0.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
BasicReader.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2024 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* BasicReader.cc (C) 2000-2024 */
9/* */
10/* Lecture simple pour les protections/reprises. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/std/internal/BasicReader.h"
15
16#include "arcane/utils/PlatformUtils.h"
17#include "arcane/utils/JSONReader.h"
18#include "arcane/utils/Ref.h"
19
20#include "arcane/core/IParallelMng.h"
21#include "arcane/core/IIOMng.h"
22#include "arcane/core/IData.h"
23#include "arcane/core/ItemGroup.h"
24#include "arcane/core/IVariable.h"
25#include "arcane/core/SerializeBuffer.h"
26
27#include "arcane/std/internal/ParallelDataReader.h"
28
29/*---------------------------------------------------------------------------*/
30/*---------------------------------------------------------------------------*/
31
32namespace Arcane::impl
33{
34
35/*---------------------------------------------------------------------------*/
36/*---------------------------------------------------------------------------*/
37
38BasicReader::
39BasicReader(IApplication* app, IParallelMng* pm, Int32 forced_rank_to_read,
40 const String& path, bool want_parallel)
41: BasicReaderWriterCommon(app, pm, path, BasicReaderWriterCommon::OpenModeRead)
42, m_want_parallel(want_parallel)
43, m_nb_written_part(0)
44, m_version(-1)
45, m_first_rank_to_read(0)
46, m_nb_rank_to_read(0)
47, m_forced_rank_to_read(forced_rank_to_read)
48, m_item_group_finder(nullptr)
49{
50}
51
52/*---------------------------------------------------------------------------*/
53/*---------------------------------------------------------------------------*/
54
55void BasicReader::
56initialize()
57{
58 info() << "BasicReader::initialize()";
59
60 IParallelMng* pm = m_parallel_mng;
61 // Si un fichier 'arcane_acr_db.json' existe alors on lit les informations
62 // de ce fichier pour détecter entre autre le numéro de version. Il faut
63 // le faire avant de lire les informations telles que les méta-données
64 // de la protection car l'emplacement des ces dernières dépend de la version
65 String db_filename = String::concat(m_path, "/arcane_acr_db.json");
66 Int32 has_db_file = 0;
67 {
68 if (pm->isMasterIO())
69 has_db_file = platform::isFileReadable(db_filename) ? 1 : 0;
70 pm->broadcast(Int32ArrayView(1, &has_db_file), pm->masterIORank());
71 }
72 String data_compressor_name;
73 String hash_algorithm_name;
74 String comparison_hash_algorithm_name;
75 if (has_db_file) {
76 UniqueArray<Byte> bytes;
77 pm->ioMng()->collectiveRead(db_filename, bytes, false);
78 JSONDocument json_doc;
79 json_doc.parse(bytes, db_filename);
80 JSONValue root = json_doc.root();
81 JSONValue jv_arcane_db = root.expectedChild(_getArcaneDBTag());
82 m_version = jv_arcane_db.expectedChild("Version").valueAsInt32();
83 m_nb_written_part = jv_arcane_db.expectedChild("NbPart").valueAsInt32();
84 data_compressor_name = jv_arcane_db.child("DataCompressor").value();
85 hash_algorithm_name = jv_arcane_db.child("HashAlgorithm").value();
86 comparison_hash_algorithm_name = jv_arcane_db.child("ComparisonHashAlgorithm").value();
87 info() << "**--** Begin read using database version=" << m_version
88 << " nb_part=" << m_nb_written_part
89 << " compressor=" << data_compressor_name
90 << " hash_algorithm=" << hash_algorithm_name
91 << " comparison_hash_algorithm=" << comparison_hash_algorithm_name;
92 }
93 else {
94 // Ancien format
95 // Le proc maitre lit le fichier 'infos.txt' et envoie les informations
96 // aux autres. Ce format ne permet d'avoir que le nombre de parties
97 // comme information.
98 if (pm->isMasterIO()) {
99 Integer nb_part = 0;
100 String filename = String::concat(m_path, "/infos.txt");
101 std::ifstream ifile(filename.localstr());
102 ifile >> nb_part;
103 info(4) << "** NB PART=" << nb_part;
104 m_nb_written_part = nb_part;
105 }
106 pm->broadcast(Int32ArrayView(1, &m_nb_written_part), pm->masterIORank());
107 }
108 if (m_version >= 3) {
109 Int32 rank_to_read = m_forced_rank_to_read;
110 if (rank_to_read < 0) {
111 if (m_nb_written_part > 1)
112 rank_to_read = pm->commRank();
113 else
114 rank_to_read = 0;
115 }
116 String main_filename = _getBasicVariableFile(m_version, m_path, rank_to_read);
117 m_forced_rank_to_read_text_reader = makeRef(new KeyValueTextReader(traceMng(), main_filename, m_version));
118 if (!data_compressor_name.empty()) {
119 Ref<IDataCompressor> dc = _createDeflater(m_application, data_compressor_name);
120 m_forced_rank_to_read_text_reader->setDataCompressor(dc);
121 }
122 if (!hash_algorithm_name.empty()) {
123 Ref<IHashAlgorithm> v = _createHashAlgorithm(m_application, hash_algorithm_name);
124 m_forced_rank_to_read_text_reader->setHashAlgorithm(v);
125 }
126 if (!comparison_hash_algorithm_name.empty()) {
127 Ref<IHashAlgorithm> v = _createHashAlgorithm(m_application, comparison_hash_algorithm_name);
128 m_comparison_hash_algorithm = v;
129 }
130 }
131}
132
133/*---------------------------------------------------------------------------*/
134/*---------------------------------------------------------------------------*/
135
136void BasicReader::
137_directReadVal(VariableMetaData* varmd, IData* data)
138{
139 info(4) << "DIRECT READ VAL v=" << varmd->fullName();
140
141 bool is_item_variable = !varmd->itemFamilyName().null();
142 Int32 nb_rank_to_read = m_nb_rank_to_read;
143 // S'il s'agit d'une variable qui n'est pas sur le maillage,
144 // il ne faut lire qu'un seul rang car il n'est pas
145 // certain que cette variable soit definie partout
146 if (!is_item_variable)
147 if (nb_rank_to_read > 1)
148 nb_rank_to_read = 1;
149
150 UniqueArray<Ref<IData>> allocated_data;
151 UniqueArray<IData*> written_data(nb_rank_to_read);
152
153 for (Integer i = 0; i < nb_rank_to_read; ++i) {
154 written_data[i] = data;
155 if ((nb_rank_to_read > 1 || m_want_parallel) && is_item_variable) {
156 Ref<IData> new_data = data->cloneEmptyRef();
157 written_data[i] = new_data.get();
158 allocated_data.add(new_data);
159 }
160 String vname = varmd->fullName();
161 info(4) << " TRY TO READ var_full_name=" << vname;
162 m_global_readers[i]->readData(vname, written_data[i]);
163 if (i==0 && m_comparison_hash_algorithm.get() )
164 info(5) << "COMPARISON_HASH =" << m_global_readers[i]->comparisonHashValue(vname);
165 }
166
167 if (is_item_variable) {
168 Ref<ParallelDataReader> parallel_data_reader = _getReader(varmd);
169
170 Int64UniqueArray full_written_unique_ids;
171 IData* full_written_data = nullptr;
172
173 if (nb_rank_to_read == 0) {
174 // Rien à lire
175 // Il faut tout de même passer dans le reader parallèle
176 // pour assurer les opérations collectives
177 full_written_data = nullptr;
178 }
179 else if (nb_rank_to_read == 1) {
180 //full_written_unique_ids = written_unique_ids[0];
181 full_written_data = written_data[0];
182 }
183 else {
184 // Il faut créer une donnée qui contient l'union des written_data
185 Ref<IData> allocated_written_data = data->cloneEmptyRef();
186 allocated_data.add(allocated_written_data);
187 full_written_data = allocated_written_data.get();
188 SerializeBuffer sbuf;
189 sbuf.setMode(ISerializer::ModeReserve);
190 for (Int32 i = 0; i < nb_rank_to_read; ++i)
191 written_data[i]->serialize(&sbuf, nullptr);
192 sbuf.allocateBuffer();
193 sbuf.setMode(ISerializer::ModePut);
194 for (Int32 i = 0; i < nb_rank_to_read; ++i)
195 written_data[i]->serialize(&sbuf, nullptr);
196 sbuf.setMode(ISerializer::ModeGet);
197 sbuf.setReadMode(ISerializer::ReadAdd);
198 for (Int32 i = 0; i < nb_rank_to_read; ++i)
199 full_written_data->serialize(&sbuf, nullptr);
200 }
201 if (data != full_written_data) {
202 info(5) << "PARALLEL READ";
203 parallel_data_reader->getSortedValues(full_written_data, data);
204 }
205 }
206}
207
208/*---------------------------------------------------------------------------*/
209/*---------------------------------------------------------------------------*/
210
211Ref<ParallelDataReader> BasicReader::
212_getReader(VariableMetaData* varmd)
213{
214 Int32 nb_to_read = m_nb_rank_to_read;
215
216 // Pour la lecture, lors d'une reprise, le groupe (var->itemGroup())
217 // associé à la variable ainsi que la famille (var->itemFamily())
218 // n'existe pas encore. Il ne faut donc pas l'utiliser
219 const String& var_group_name = varmd->itemGroupName();
220 String group_full_name = varmd->meshName() + "_" + varmd->itemFamilyName() + "_" + var_group_name;
221 auto ix = m_parallel_data_readers.find(group_full_name);
222 if (ix != m_parallel_data_readers.end())
223 return ix->second;
224
225 IParallelMng* pm = m_parallel_mng;
226 Ref<ParallelDataReader> reader = makeRef(new ParallelDataReader(pm));
227 {
228 UniqueArray<SharedArray<Int64>> written_unique_ids(nb_to_read);
229 Int64Array& wanted_unique_ids = reader->wantedUniqueIds();
230 for (Integer i = 0; i < nb_to_read; ++i) {
231 m_global_readers[i]->readItemGroup(group_full_name, written_unique_ids[i], wanted_unique_ids);
232 }
233 // Cela ne doit être actif que pour les comparaisons (pas en reprise car les groupes
234 // ne sont pas valides)
235 if (m_item_group_finder) {
236 ItemGroup ig = m_item_group_finder->getWantedGroup(varmd);
237 _fillUniqueIds(ig, wanted_unique_ids);
238 }
239 for (Integer i = 0; i < nb_to_read; ++i) {
240 Integer nb_uid = written_unique_ids[i].size();
241 if (nb_uid >= 1)
242 info(5) << "PART I=" << i
243 << " nb_uid=" << nb_uid
244 << " min_uid=" << written_unique_ids[i][0]
245 << " max_uid=" << written_unique_ids[i][nb_uid - 1];
246 }
247 Int64Array& full_written_unique_ids = reader->writtenUniqueIds();
248 for (Integer i = 0; i < nb_to_read; ++i)
249 full_written_unique_ids.addRange(written_unique_ids[i]);
250 info(5) << "FULL UID SIZE=" << full_written_unique_ids.size();
251 if (m_want_parallel)
252 reader->sort();
253 }
254 m_parallel_data_readers.insert(std::make_pair(group_full_name, reader));
255 return reader;
256}
257
258/*---------------------------------------------------------------------------*/
259/*---------------------------------------------------------------------------*/
265void BasicReader::
266fillComparisonHash(std::map<String, String>& comparison_hash_map)
267{
268 comparison_hash_map.clear();
269 if (m_nb_rank_to_read==0)
270 return;
271 if (m_parallel_mng->commRank()!=m_parallel_mng->masterIORank())
272 return;
273 const VariableDataInfoMap& var_map = m_global_readers[0]->variablesDataInfoMap();
274 for( auto v : var_map){
275 VariableDataInfo* vd = v.second.get();
276 comparison_hash_map.try_emplace(vd->fullName(),vd->comparisonHashValue());
277 }
278}
279
280/*---------------------------------------------------------------------------*/
281/*---------------------------------------------------------------------------*/
282
283void BasicReader::
284read(IVariable* var, IData* data)
285{
286 info(4) << "MASTER READ var=" << var->fullName() << " data=" << data;
287 if (var->isPartial()) {
288 info() << "** WARNING: partial variable not implemented in BasicReaderWriter";
289 return;
290 }
291 Ref<VariableMetaData> vmd(var->createMetaDataRef());
292 _directReadVal(vmd.get(), data);
293}
294
295/*---------------------------------------------------------------------------*/
296/*---------------------------------------------------------------------------*/
297
298void BasicReader::
299read(const VariableDataReadInfo& infos)
300{
301 VariableMetaData* vmd = infos.variableMetaData();
302 IData* data = infos.data();
303 info(4) << "MASTER2 READ var=" << vmd->fullName() << " data=" << data;
304 if (vmd->isPartial()) {
305 info() << "** WARNING: partial variable not implemented in BasicReaderWriter";
306 return;
307 }
308 _directReadVal(vmd, data);
309}
310
311/*---------------------------------------------------------------------------*/
312/*---------------------------------------------------------------------------*/
313
314String BasicReader::
315metaData()
316{
317 ByteUniqueArray bytes;
318 fillMetaData(bytes);
319 String s(bytes);
320 info(5) << " S=" << s << '\n';
321 return s;
322}
323
324/*---------------------------------------------------------------------------*/
325/*---------------------------------------------------------------------------*/
326
327void BasicReader::
328fillMetaData(ByteArray& bytes)
329{
330 Int32 rank = m_parallel_mng->commRank();
331 if (m_forced_rank_to_read >= 0)
332 rank = m_forced_rank_to_read;
333 if (m_version >= 3) {
334 Int64 meta_data_size = 0;
335 String key_name = "Global:CheckpointMetadata";
336 info(4) << "Reading checkpoint metadata from database";
337 m_forced_rank_to_read_text_reader->getExtents(key_name, Int64ArrayView(1, &meta_data_size));
338 bytes.resize(meta_data_size);
339 m_forced_rank_to_read_text_reader->read(key_name, asWritableBytes(bytes.span()));
340 }
341 else {
342 String filename = _getMetaDataFileName(rank);
343 info(4) << "Reading checkpoint metadata file=" << filename;
344 platform::readAllFile(filename, false, bytes);
345 }
346}
347
348/*---------------------------------------------------------------------------*/
349/*---------------------------------------------------------------------------*/
350
351void BasicReader::
352_setRanksToRead()
353{
354 IParallelMng* pm = m_parallel_mng;
355 Int32 my_rank = pm->commRank();
356 Int32 nb_rank = pm->commSize();
357 Int32 first_to_read = my_rank;
358 Int32 nb_to_read = 1;
359
360 if (m_forced_rank_to_read >= 0) {
361 m_first_rank_to_read = m_forced_rank_to_read;
362 m_nb_rank_to_read = 1;
363 return;
364 }
365 if (nb_rank == 1) {
366 nb_to_read = m_nb_written_part;
367 }
368 else if (nb_rank < m_nb_written_part) {
369 // Il y a plus de fichiers que de processeurs.
370 // Un des processeurs doit donc lire au moins deux fichiers.
371 // Pour ceux dont c'est le cas, il faut que ces fichiers
372 // soient consécutifs pour que l'intervalle des uniqueId()
373 // des entités du processeur soient consécutifs
374 Int32 nb_part_per_rank = m_nb_written_part / nb_rank;
375 Int32 remaining_nb_part = m_nb_written_part - (nb_part_per_rank * nb_rank);
376 first_to_read = nb_part_per_rank * my_rank;
377 nb_to_read = nb_part_per_rank;
378 info(4) << "NB_PART_PER_RANK = " << nb_part_per_rank
379 << " REMAINING=" << remaining_nb_part;
380 if (my_rank >= remaining_nb_part) {
381 first_to_read += remaining_nb_part;
382 }
383 else {
384 first_to_read += my_rank;
385 ++nb_to_read;
386 }
387 }
388 else {
389 if (my_rank >= m_nb_written_part) {
390 // Aucune partie à lire
391 nb_to_read = 0;
392 }
393 }
394
395 m_first_rank_to_read = first_to_read;
396 m_nb_rank_to_read = nb_to_read;
397}
398
399/*---------------------------------------------------------------------------*/
400/*---------------------------------------------------------------------------*/
401
402Ref<IGenericReader> BasicReader::
403_readOwnMetaDataAndCreateReader(Int32 rank)
404{
405 String main_filename = _getBasicVariableFile(m_version, m_path, rank);
406 Ref<KeyValueTextReader> text_reader;
407 if (m_version >= 3) {
408 // Si le rang est le même que m_forced_rank_to_read, alors on peut réutiliser
409 // le lecteur déjà créé.
410 if (rank == m_forced_rank_to_read)
411 text_reader = m_forced_rank_to_read_text_reader;
412 else {
413 text_reader = makeRef(new KeyValueTextReader(traceMng(), main_filename, m_version));
414 // Il faut que ce lecteur ait le même gestionnaire de compression
415 // que celui déjà créé
416 text_reader->setDataCompressor(m_forced_rank_to_read_text_reader->dataCompressor());
417 text_reader->setHashAlgorithm(m_forced_rank_to_read_text_reader->hashAlgorithm());
418 }
419 }
420
421 auto r = makeRef<IGenericReader>(new BasicGenericReader(m_application, m_version, text_reader));
422 r->initialize(m_path, rank);
423 return r;
424}
425
426/*---------------------------------------------------------------------------*/
427/*---------------------------------------------------------------------------*/
428
429void BasicReader::
430beginRead(const VariableCollection& vars)
431{
432 ARCANE_UNUSED(vars);
433 beginRead(DataReaderInfo());
434}
435
436/*---------------------------------------------------------------------------*/
437/*---------------------------------------------------------------------------*/
438
439void BasicReader::
440beginRead(const DataReaderInfo& infos)
441{
442 ARCANE_UNUSED(infos);
443 info(4) << "** ** BEGIN READ";
444
445 _setRanksToRead();
446 info(4) << "RanksToRead: FIRST TO READ =" << m_first_rank_to_read
447 << " nb=" << m_nb_rank_to_read << " version=" << m_version;
448 m_global_readers.resize(m_nb_rank_to_read);
449 for (Integer i = 0; i < m_nb_rank_to_read; ++i)
450 m_global_readers[i] = _readOwnMetaDataAndCreateReader(i + m_first_rank_to_read);
451}
452
453/*---------------------------------------------------------------------------*/
454/*---------------------------------------------------------------------------*/
455
456} // namespace Arcane::impl
457
458/*---------------------------------------------------------------------------*/
459/*---------------------------------------------------------------------------*/
Tableau d'items de types quelconques.
Informations de relecture des données.
Interface d'une donnée.
Definition IData.h:33
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual Int32 commRank() const =0
Rang de cette instance dans le communicateur.
virtual bool isMasterIO() const =0
true si l'instance est un gestionnaire maître des entrées/sorties.
virtual Int32 commSize() const =0
Nombre d'instance dans le communicateur.
virtual IIOMng * ioMng() const =0
Gestionnaire des entrées/sorties.
virtual Integer masterIORank() const =0
Rang de l'instance gérant les entrées/sorties (pour laquelle isMasterIO() est vrai)
Interface d'une variable.
Definition IVariable.h:54
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Definition Lima.cc:149
Informations de relecture des données d'une variable.
Meta-données sur une variable.
Tableau associatif des données des variables.
Informations sur les données d'une variable.
Vue modifiable d'un tableau d'un type T.
void addRange(ConstReferenceType val, Int64 n)
Ajoute n élément de valeur val à la fin du tableau.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
Chaîne de caractères unicode.
Vecteur 1D de données avec sémantique par valeur (style STL).
Array< Int64 > Int64Array
Tableau dynamique à une dimension d'entiers 64 bits.
Definition UtilsTypes.h:336
UniqueArray< Int64 > Int64UniqueArray
Tableau dynamique à une dimension d'entiers 64 bits.
Definition UtilsTypes.h:550
ArrayView< Int32 > Int32ArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:664
Int32 Integer
Type représentant un entier.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.