Arcane  4.1.12.0
Developer documentation
Loading...
Searching...
No Matches
BasicReader.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2026 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* BasicReader.cc (C) 2000-2024 */
9/* */
10/* Simple reader for checkpoints/restarts. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/std/internal/BasicReader.h"
15
16#include "arcane/utils/PlatformUtils.h"
17#include "arcane/utils/JSONReader.h"
18#include "arcane/utils/Ref.h"
19
20#include "arcane/core/IParallelMng.h"
21#include "arcane/core/IIOMng.h"
22#include "arcane/core/IData.h"
23#include "arcane/core/ItemGroup.h"
24#include "arcane/core/IVariable.h"
25#include "arcane/core/SerializeBuffer.h"
26
27#include "arcane/std/internal/ParallelDataReader.h"
28
29/*---------------------------------------------------------------------------*/
30/*---------------------------------------------------------------------------*/
31
32namespace Arcane::impl
33{
34
35/*---------------------------------------------------------------------------*/
36/*---------------------------------------------------------------------------*/
37
38BasicReader::
39BasicReader(IApplication* app, IParallelMng* pm, Int32 forced_rank_to_read,
40 const String& path, bool want_parallel)
41: BasicReaderWriterCommon(app, pm, path, BasicReaderWriterCommon::OpenModeRead)
42, m_want_parallel(want_parallel)
43, m_nb_written_part(0)
44, m_version(-1)
45, m_first_rank_to_read(0)
46, m_nb_rank_to_read(0)
47, m_forced_rank_to_read(forced_rank_to_read)
48, m_item_group_finder(nullptr)
49{
50}
51
52/*---------------------------------------------------------------------------*/
53/*---------------------------------------------------------------------------*/
54
55void BasicReader::
56initialize()
57{
58 info() << "BasicReader::initialize()";
59
60 IParallelMng* pm = m_parallel_mng;
61 // If an 'arcane_acr_db.json' file exists, read the information
62 // from this file to detect, among other things, the version number.
63 // This must be done before reading information such as the metadata
64 // of the checkpoint because its location depends on the version.
65 String db_filename = String::concat(m_path, "/arcane_acr_db.json");
66 Int32 has_db_file = 0;
67 {
68 if (pm->isMasterIO())
69 has_db_file = platform::isFileReadable(db_filename) ? 1 : 0;
70 pm->broadcast(Int32ArrayView(1, &has_db_file), pm->masterIORank());
71 }
72 String data_compressor_name;
73 String hash_algorithm_name;
74 String comparison_hash_algorithm_name;
75 if (has_db_file) {
76 UniqueArray<Byte> bytes;
77 pm->ioMng()->collectiveRead(db_filename, bytes, false);
78 JSONDocument json_doc;
79 json_doc.parse(bytes, db_filename);
80 JSONValue root = json_doc.root();
81 JSONValue jv_arcane_db = root.expectedChild(_getArcaneDBTag());
82 m_version = jv_arcane_db.expectedChild("Version").valueAsInt32();
83 m_nb_written_part = jv_arcane_db.expectedChild("NbPart").valueAsInt32();
84 data_compressor_name = jv_arcane_db.child("DataCompressor").value();
85 hash_algorithm_name = jv_arcane_db.child("HashAlgorithm").value();
86 comparison_hash_algorithm_name = jv_arcane_db.child("ComparisonHashAlgorithm").value();
87 info() << "**--** Begin read using database version=" << m_version
88 << " nb_part=" << m_nb_written_part
89 << " compressor=" << data_compressor_name
90 << " hash_algorithm=" << hash_algorithm_name
91 << " comparison_hash_algorithm=" << comparison_hash_algorithm_name;
92 }
93 else {
94 // Old format
95 // The master process reads the 'infos.txt' file and sends the information
96 // to the others. This format only allows the number of parts
97 // as information.
98 if (pm->isMasterIO()) {
99 Integer nb_part = 0;
100 String filename = String::concat(m_path, "/infos.txt");
101 std::ifstream ifile(filename.localstr());
102 ifile >> nb_part;
103 info(4) << "** NB PART=" << nb_part;
104 m_nb_written_part = nb_part;
105 }
106 pm->broadcast(Int32ArrayView(1, &m_nb_written_part), pm->masterIORank());
107 }
108 if (m_version >= 3) {
109 Int32 rank_to_read = m_forced_rank_to_read;
110 if (rank_to_read < 0) {
111 if (m_nb_written_part > 1)
112 rank_to_read = pm->commRank();
113 else
114 rank_to_read = 0;
115 }
116 String main_filename = _getBasicVariableFile(m_version, m_path, rank_to_read);
117 m_forced_rank_to_read_text_reader = makeRef(new KeyValueTextReader(traceMng(), main_filename, m_version));
118 if (!data_compressor_name.empty()) {
119 Ref<IDataCompressor> dc = _createDeflater(m_application, data_compressor_name);
120 m_forced_rank_to_read_text_reader->setDataCompressor(dc);
121 }
122 if (!hash_algorithm_name.empty()) {
123 Ref<IHashAlgorithm> v = _createHashAlgorithm(m_application, hash_algorithm_name);
124 m_forced_rank_to_read_text_reader->setHashAlgorithm(v);
125 }
126 if (!comparison_hash_algorithm_name.empty()) {
127 Ref<IHashAlgorithm> v = _createHashAlgorithm(m_application, comparison_hash_algorithm_name);
128 m_comparison_hash_algorithm = v;
129 }
130 }
131}
132
133/*---------------------------------------------------------------------------*/
134/*---------------------------------------------------------------------------*/
135
136void BasicReader::
137_directReadVal(VariableMetaData* varmd, IData* data)
138{
139 info(4) << "DIRECT READ VAL v=" << varmd->fullName();
140
141 bool is_item_variable = !varmd->itemFamilyName().null();
142 Int32 nb_rank_to_read = m_nb_rank_to_read;
143 // If it is a variable that is not on the mesh,
144 // only one rank must be read because it is not
145 // certain that this variable is defined everywhere
146 if (!is_item_variable)
147 if (nb_rank_to_read > 1)
148 nb_rank_to_read = 1;
149
150 UniqueArray<Ref<IData>> allocated_data;
151 UniqueArray<IData*> written_data(nb_rank_to_read);
152
153 for (Integer i = 0; i < nb_rank_to_read; ++i) {
154 written_data[i] = data;
155 if ((nb_rank_to_read > 1 || m_want_parallel) && is_item_variable) {
156 Ref<IData> new_data = data->cloneEmptyRef();
157 written_data[i] = new_data.get();
158 allocated_data.add(new_data);
159 }
160 String vname = varmd->fullName();
161 info(4) << " TRY TO READ var_full_name=" << vname;
162 m_global_readers[i]->readData(vname, written_data[i]);
163 if (i == 0 && m_comparison_hash_algorithm.get())
164 info(5) << "COMPARISON_HASH =" << m_global_readers[i]->comparisonHashValue(vname);
165 }
166
167 if (is_item_variable) {
168 Ref<ParallelDataReader> parallel_data_reader = _getReader(varmd);
169
170 Int64UniqueArray full_written_unique_ids;
171 IData* full_written_data = nullptr;
172
173 if (nb_rank_to_read == 0) {
174 // Nothing to read
175 // However, we must pass through the parallel reader
176 // to ensure collective operations
177 full_written_data = nullptr;
178 }
179 else if (nb_rank_to_read == 1) {
180 //full_written_unique_ids = written_unique_ids[0];
181 full_written_data = written_data[0];
182 }
183 else {
184 // We must create a data object that contains the union of written_data
185 Ref<IData> allocated_written_data = data->cloneEmptyRef();
186 allocated_data.add(allocated_written_data);
187 full_written_data = allocated_written_data.get();
188 SerializeBuffer sbuf;
189 sbuf.setMode(ISerializer::ModeReserve);
190 for (Int32 i = 0; i < nb_rank_to_read; ++i)
191 written_data[i]->serialize(&sbuf, nullptr);
192 sbuf.allocateBuffer();
193 sbuf.setMode(ISerializer::ModePut);
194 for (Int32 i = 0; i < nb_rank_to_read; ++i)
195 written_data[i]->serialize(&sbuf, nullptr);
196 sbuf.setMode(ISerializer::ModeGet);
197 sbuf.setReadMode(ISerializer::ReadAdd);
198 for (Int32 i = 0; i < nb_rank_to_read; ++i)
199 full_written_data->serialize(&sbuf, nullptr);
200 }
201 if (data != full_written_data) {
202 info(5) << "PARALLEL READ";
203 parallel_data_reader->getSortedValues(full_written_data, data);
204 }
205 }
206}
207
208/*---------------------------------------------------------------------------*/
209/*---------------------------------------------------------------------------*/
210
211Ref<ParallelDataReader> BasicReader::
212_getReader(VariableMetaData* varmd)
213{
214 Int32 nb_to_read = m_nb_rank_to_read;
215
216 // For reading, during a restart, the group (var->itemGroup())
217 // associated with the variable and the family (var->itemFamily())
218 // do not exist yet. Therefore, they should not be used.
219 const String& var_group_name = varmd->itemGroupName();
220 String group_full_name = varmd->meshName() + "_" + varmd->itemFamilyName() + "_" + var_group_name;
221 auto ix = m_parallel_data_readers.find(group_full_name);
222 if (ix != m_parallel_data_readers.end())
223 return ix->second;
224
225 IParallelMng* pm = m_parallel_mng;
226 Ref<ParallelDataReader> reader = makeRef(new ParallelDataReader(pm));
227 {
228 UniqueArray<SharedArray<Int64>> written_unique_ids(nb_to_read);
229 Int64Array& wanted_unique_ids = reader->wantedUniqueIds();
230 for (Integer i = 0; i < nb_to_read; ++i) {
231 m_global_readers[i]->readItemGroup(group_full_name, written_unique_ids[i], wanted_unique_ids);
232 }
233 // This should only be active for comparisons (not during restarts because the groups
234 // are not valid)
235 if (m_item_group_finder) {
236 ItemGroup ig = m_item_group_finder->getWantedGroup(varmd);
237 _fillUniqueIds(ig, wanted_unique_ids);
238 }
239 for (Integer i = 0; i < nb_to_read; ++i) {
240 Integer nb_uid = written_unique_ids[i].size();
241 if (nb_uid >= 1)
242 info(5) << "PART I=" << i
243 << " nb_uid=" << nb_uid
244 << " min_uid=" << written_unique_ids[i][0]
245 << " max_uid=" << written_unique_ids[i][nb_uid - 1];
246 }
247 Int64Array& full_written_unique_ids = reader->writtenUniqueIds();
248 for (Integer i = 0; i < nb_to_read; ++i)
249 full_written_unique_ids.addRange(written_unique_ids[i]);
250 info(5) << "FULL UID SIZE=" << full_written_unique_ids.size();
251 if (m_want_parallel)
252 reader->sort();
253 }
254 m_parallel_data_readers.insert(std::make_pair(group_full_name, reader));
255 return reader;
256}
257
258/*---------------------------------------------------------------------------*/
259/*---------------------------------------------------------------------------*/
260
266void BasicReader::
267fillComparisonHash(std::map<String, String>& comparison_hash_map)
268{
269 comparison_hash_map.clear();
270 if (m_nb_rank_to_read == 0)
271 return;
272 if (m_parallel_mng->commRank() != m_parallel_mng->masterIORank())
273 return;
274 const VariableDataInfoMap& var_map = m_global_readers[0]->variablesDataInfoMap();
275 for (auto v : var_map) {
276 VariableDataInfo* vd = v.second.get();
277 comparison_hash_map.try_emplace(vd->fullName(), vd->comparisonHashValue());
278 }
279}
280
281/*---------------------------------------------------------------------------*/
282/*---------------------------------------------------------------------------*/
283
284void BasicReader::
285read(IVariable* var, IData* data)
286{
287 info(4) << "MASTER READ var=" << var->fullName() << " data=" << data;
288 if (var->isPartial()) {
289 info() << "** WARNING: partial variable not implemented in BasicReaderWriter";
290 return;
291 }
293 _directReadVal(vmd.get(), data);
294}
295
296/*---------------------------------------------------------------------------*/
297/*---------------------------------------------------------------------------*/
298
299void BasicReader::
300read(const VariableDataReadInfo& infos)
301{
302 VariableMetaData* vmd = infos.variableMetaData();
303 IData* data = infos.data();
304 info(4) << "MASTER2 READ var=" << vmd->fullName() << " data=" << data;
305 if (vmd->isPartial()) {
306 info() << "** WARNING: partial variable not implemented in BasicReaderWriter";
307 return;
308 }
309 _directReadVal(vmd, data);
310}
311
312/*---------------------------------------------------------------------------*/
313/*---------------------------------------------------------------------------*/
314
315String BasicReader::
316metaData()
317{
318 ByteUniqueArray bytes;
319 fillMetaData(bytes);
320 String s(bytes);
321 info(5) << " S=" << s << '\n';
322 return s;
323}
324
325/*---------------------------------------------------------------------------*/
326/*---------------------------------------------------------------------------*/
327
328void BasicReader::
329fillMetaData(ByteArray& bytes)
330{
331 Int32 rank = m_parallel_mng->commRank();
332 if (m_forced_rank_to_read >= 0)
333 rank = m_forced_rank_to_read;
334 if (m_version >= 3) {
335 Int64 meta_data_size = 0;
336 String key_name = "Global:CheckpointMetadata";
337 info(4) << "Reading checkpoint metadata from database";
338 m_forced_rank_to_read_text_reader->getExtents(key_name, Int64ArrayView(1, &meta_data_size));
339 bytes.resize(meta_data_size);
340 m_forced_rank_to_read_text_reader->read(key_name, asWritableBytes(bytes.span()));
341 }
342 else {
343 String filename = _getMetaDataFileName(rank);
344 info(4) << "Reading checkpoint metadata file=" << filename;
345 platform::readAllFile(filename, false, bytes);
346 }
347}
348
349/*---------------------------------------------------------------------------*/
350/*---------------------------------------------------------------------------*/
351
352void BasicReader::
353_setRanksToRead()
354{
355 IParallelMng* pm = m_parallel_mng;
356 Int32 my_rank = pm->commRank();
357 Int32 nb_rank = pm->commSize();
358 Int32 first_to_read = my_rank;
359 Int32 nb_to_read = 1;
360
361 if (m_forced_rank_to_read >= 0) {
362 m_first_rank_to_read = m_forced_rank_to_read;
363 m_nb_rank_to_read = 1;
364 return;
365 }
366 if (nb_rank == 1) {
367 nb_to_read = m_nb_written_part;
368 }
369 else if (nb_rank < m_nb_written_part) {
370 // There are more files than processors.
371 // Therefore, one of the processors must read at least two files.
372 // For those cases, these files must be consecutive so that the
373 // interval of uniqueId() of the processor's entities is consecutive
374 Int32 nb_part_per_rank = m_nb_written_part / nb_rank;
375 Int32 remaining_nb_part = m_nb_written_part - (nb_part_per_rank * nb_rank);
376 first_to_read = nb_part_per_rank * my_rank;
377 nb_to_read = nb_part_per_rank;
378 info(4) << "NB_PART_PER_RANK = " << nb_part_per_rank
379 << " REMAINING=" << remaining_nb_part;
380 if (my_rank >= remaining_nb_part) {
381 first_to_read += remaining_nb_part;
382 }
383 else {
384 first_to_read += my_rank;
385 ++nb_to_read;
386 }
387 }
388 else {
389 if (my_rank >= m_nb_written_part) {
390 // No part to read
391 nb_to_read = 0;
392 }
393 }
394
395 m_first_rank_to_read = first_to_read;
396 m_nb_rank_to_read = nb_to_read;
397}
398
399/*---------------------------------------------------------------------------*/
400/*---------------------------------------------------------------------------*/
401
402Ref<IGenericReader> BasicReader::
403_readOwnMetaDataAndCreateReader(Int32 rank)
404{
405 String main_filename = _getBasicVariableFile(m_version, m_path, rank);
406 Ref<KeyValueTextReader> text_reader;
407 if (m_version >= 3) {
408 // If the rank is the same as m_forced_rank_to_read, then the already created
409 // reader can be reused.
410 if (rank == m_forced_rank_to_read)
411 text_reader = m_forced_rank_to_read_text_reader;
412 else {
413 text_reader = makeRef(new KeyValueTextReader(traceMng(), main_filename, m_version));
414 // This reader must have the same compression manager
415 // as the one already created
416 text_reader->setDataCompressor(m_forced_rank_to_read_text_reader->dataCompressor());
417 text_reader->setHashAlgorithm(m_forced_rank_to_read_text_reader->hashAlgorithm());
418 }
419 }
420
421 auto r = makeRef<IGenericReader>(new BasicGenericReader(m_application, m_version, text_reader));
422 r->initialize(m_path, rank);
423 return r;
424}
425
426/*---------------------------------------------------------------------------*/
427/*---------------------------------------------------------------------------*/
428
429void BasicReader::
430beginRead(const VariableCollection& vars)
431{
432 ARCANE_UNUSED(vars);
433 beginRead(DataReaderInfo());
434}
435
436/*---------------------------------------------------------------------------*/
437/*---------------------------------------------------------------------------*/
438
439void BasicReader::
440beginRead(const DataReaderInfo& infos)
441{
442 ARCANE_UNUSED(infos);
443 info(4) << "** ** BEGIN READ";
444
445 _setRanksToRead();
446 info(4) << "RanksToRead: FIRST TO READ =" << m_first_rank_to_read
447 << " nb=" << m_nb_rank_to_read << " version=" << m_version;
448 m_global_readers.resize(m_nb_rank_to_read);
449 for (Integer i = 0; i < m_nb_rank_to_read; ++i)
450 m_global_readers[i] = _readOwnMetaDataAndCreateReader(i + m_first_rank_to_read);
451}
452
453/*---------------------------------------------------------------------------*/
454/*---------------------------------------------------------------------------*/
455
456} // namespace Arcane::impl
457
458/*---------------------------------------------------------------------------*/
459/*---------------------------------------------------------------------------*/
void addRange(ConstReferenceType val, Int64 n)
Adds n elements of value val to the end of the array.
Data reading information.
Interface of a data item.
Definition IData.h:34
Interface of the parallelism manager for a subdomain.
virtual Int32 commRank() const =0
Rank of this instance in the communicator.
virtual Int32 commSize() const =0
Number of instances in the communicator.
Interface of a variable.
Definition IVariable.h:40
virtual String fullName() const =0
Full variable name (with family prefix).
virtual Ref< VariableMetaData > createMetaDataRef() const =0
Creates an instance containing the variable's metadata.
virtual bool isPartial() const =0
Indicates if the variable is partial.
InstanceType * get() const
Associated instance or nullptr if none.
Reference to an instance.
TraceMessage info() const
Flow for an information message.
Data reading information for a variable.
Metadata on a variable.
String fullName() const
Full name of the variable.
Ref< KeyValueTextReader > m_forced_rank_to_read_text_reader
Reader for the first rank to read.
Definition BasicReader.h:91
void fillMetaData(ByteArray &bytes) override
Fills bytes with the metadata content.
Associative map of variable data.
Variable data information.
bool readAllFile(StringView filename, bool is_binary, ByteArray &out_bytes)
Reads the content of a file and stores it in out_bytes.
Array< Int64 > Int64Array
Dynamic one-dimensional array of 64-bit integers.
Definition UtilsTypes.h:125
ArrayView< Int64 > Int64ArrayView
C equivalent of a 1D array of 64-bit integers.
Definition UtilsTypes.h:451
UniqueArray< Int64 > Int64UniqueArray
Dynamic 1D array of 64-bit integers.
Definition UtilsTypes.h:339
std::int64_t Int64
Signed integer type of 64 bits.
Int32 Integer
Type representing an integer.
Array< Byte > ByteArray
Dynamic one-dimensional array of characters.
Definition UtilsTypes.h:121
UniqueArray< Byte > ByteUniqueArray
Dynamic 1D array of characters.
Definition UtilsTypes.h:335
ArrayView< Int32 > Int32ArrayView
C equivalent of a 1D array of 32-bit integers.
Definition UtilsTypes.h:453
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Creates a reference on a pointer.
Impl::SpanTypeFromSize< std::byte, SizeType >::SpanType asWritableBytes(const SpanImpl< DataType, SizeType, Extent > &s)
Converts the view into an array of modifiable bytes.
Definition Span.h:1068
std::int32_t Int32
Signed integer type of 32 bits.