14#include "arcane/std/internal/BasicReader.h"
16#include "arcane/utils/PlatformUtils.h"
17#include "arcane/utils/JSONReader.h"
18#include "arcane/utils/Ref.h"
20#include "arcane/core/IParallelMng.h"
21#include "arcane/core/IIOMng.h"
22#include "arcane/core/IData.h"
23#include "arcane/core/ItemGroup.h"
24#include "arcane/core/IVariable.h"
25#include "arcane/core/SerializeBuffer.h"
27#include "arcane/std/internal/ParallelDataReader.h"
39BasicReader(IApplication* app, IParallelMng* pm,
Int32 forced_rank_to_read,
40 const String& path,
bool want_parallel)
41: BasicReaderWriterCommon(app, pm, path, BasicReaderWriterCommon::OpenModeRead)
42, m_want_parallel(want_parallel)
45, m_first_rank_to_read(0)
47, m_forced_rank_to_read(forced_rank_to_read)
48, m_item_group_finder(nullptr)
58 info() <<
"BasicReader::initialize()";
60 IParallelMng* pm = m_parallel_mng;
65 String db_filename = String::concat(m_path,
"/arcane_acr_db.json");
66 Int32 has_db_file = 0;
69 has_db_file = platform::isFileReadable(db_filename) ? 1 : 0;
72 String data_compressor_name;
73 String hash_algorithm_name;
74 String comparison_hash_algorithm_name;
76 UniqueArray<Byte> bytes;
77 pm->
ioMng()->collectiveRead(db_filename, bytes,
false);
78 JSONDocument json_doc;
79 json_doc.parse(bytes, db_filename);
80 JSONValue root = json_doc.root();
81 JSONValue jv_arcane_db = root.expectedChild(_getArcaneDBTag());
82 m_version = jv_arcane_db.expectedChild(
"Version").valueAsInt32();
83 m_nb_written_part = jv_arcane_db.expectedChild(
"NbPart").valueAsInt32();
84 data_compressor_name = jv_arcane_db.child(
"DataCompressor").value();
85 hash_algorithm_name = jv_arcane_db.child(
"HashAlgorithm").value();
86 comparison_hash_algorithm_name = jv_arcane_db.child(
"ComparisonHashAlgorithm").value();
87 info() <<
"**--** Begin read using database version=" << m_version
88 <<
" nb_part=" << m_nb_written_part
89 <<
" compressor=" << data_compressor_name
90 <<
" hash_algorithm=" << hash_algorithm_name
91 <<
" comparison_hash_algorithm=" << comparison_hash_algorithm_name;
100 String filename = String::concat(m_path,
"/infos.txt");
101 std::ifstream ifile(filename.localstr());
103 info(4) <<
"** NB PART=" << nb_part;
104 m_nb_written_part = nb_part;
108 if (m_version >= 3) {
109 Int32 rank_to_read = m_forced_rank_to_read;
110 if (rank_to_read < 0) {
111 if (m_nb_written_part > 1)
116 String main_filename = _getBasicVariableFile(m_version, m_path, rank_to_read);
117 m_forced_rank_to_read_text_reader =
makeRef(
new KeyValueTextReader(traceMng(), main_filename, m_version));
118 if (!data_compressor_name.empty()) {
119 Ref<IDataCompressor> dc = _createDeflater(m_application, data_compressor_name);
120 m_forced_rank_to_read_text_reader->setDataCompressor(dc);
122 if (!hash_algorithm_name.empty()) {
123 Ref<IHashAlgorithm> v = _createHashAlgorithm(m_application, hash_algorithm_name);
124 m_forced_rank_to_read_text_reader->setHashAlgorithm(v);
126 if (!comparison_hash_algorithm_name.empty()) {
127 Ref<IHashAlgorithm> v = _createHashAlgorithm(m_application, comparison_hash_algorithm_name);
128 m_comparison_hash_algorithm = v;
137_directReadVal(VariableMetaData* varmd, IData* data)
139 info(4) <<
"DIRECT READ VAL v=" << varmd->fullName();
141 bool is_item_variable = !varmd->itemFamilyName().null();
142 Int32 nb_rank_to_read = m_nb_rank_to_read;
146 if (!is_item_variable)
147 if (nb_rank_to_read > 1)
150 UniqueArray<Ref<IData>> allocated_data;
151 UniqueArray<IData*> written_data(nb_rank_to_read);
153 for (Integer i = 0; i < nb_rank_to_read; ++i) {
154 written_data[i] = data;
155 if ((nb_rank_to_read > 1 || m_want_parallel) && is_item_variable) {
156 Ref<IData> new_data = data->cloneEmptyRef();
157 written_data[i] = new_data.get();
158 allocated_data.add(new_data);
160 String vname = varmd->fullName();
161 info(4) <<
" TRY TO READ var_full_name=" << vname;
162 m_global_readers[i]->readData(vname, written_data[i]);
163 if (i==0 && m_comparison_hash_algorithm.get() )
164 info(5) <<
"COMPARISON_HASH =" << m_global_readers[i]->comparisonHashValue(vname);
167 if (is_item_variable) {
168 Ref<ParallelDataReader> parallel_data_reader = _getReader(varmd);
171 IData* full_written_data =
nullptr;
173 if (nb_rank_to_read == 0) {
177 full_written_data =
nullptr;
179 else if (nb_rank_to_read == 1) {
181 full_written_data = written_data[0];
185 Ref<IData> allocated_written_data = data->cloneEmptyRef();
186 allocated_data.
add(allocated_written_data);
187 full_written_data = allocated_written_data.get();
188 SerializeBuffer sbuf;
189 sbuf.setMode(ISerializer::ModeReserve);
190 for (
Int32 i = 0; i < nb_rank_to_read; ++i)
191 written_data[i]->serialize(&sbuf,
nullptr);
192 sbuf.allocateBuffer();
193 sbuf.setMode(ISerializer::ModePut);
194 for (
Int32 i = 0; i < nb_rank_to_read; ++i)
195 written_data[i]->serialize(&sbuf,
nullptr);
196 sbuf.setMode(ISerializer::ModeGet);
197 sbuf.setReadMode(ISerializer::ReadAdd);
198 for (
Int32 i = 0; i < nb_rank_to_read; ++i)
199 full_written_data->serialize(&sbuf,
nullptr);
201 if (data != full_written_data) {
202 info(5) <<
"PARALLEL READ";
203 parallel_data_reader->getSortedValues(full_written_data, data);
211Ref<ParallelDataReader> BasicReader::
212_getReader(VariableMetaData* varmd)
214 Int32 nb_to_read = m_nb_rank_to_read;
219 const String& var_group_name = varmd->itemGroupName();
220 String group_full_name = varmd->meshName() +
"_" + varmd->itemFamilyName() +
"_" + var_group_name;
221 auto ix = m_parallel_data_readers.find(group_full_name);
222 if (ix != m_parallel_data_readers.end())
225 IParallelMng* pm = m_parallel_mng;
226 Ref<ParallelDataReader> reader =
makeRef(
new ParallelDataReader(pm));
228 UniqueArray<SharedArray<Int64>> written_unique_ids(nb_to_read);
229 Int64Array& wanted_unique_ids = reader->wantedUniqueIds();
230 for (Integer i = 0; i < nb_to_read; ++i) {
231 m_global_readers[i]->readItemGroup(group_full_name, written_unique_ids[i], wanted_unique_ids);
235 if (m_item_group_finder) {
236 ItemGroup ig = m_item_group_finder->getWantedGroup(varmd);
237 _fillUniqueIds(ig, wanted_unique_ids);
239 for (Integer i = 0; i < nb_to_read; ++i) {
240 Integer nb_uid = written_unique_ids[i].size();
242 info(5) <<
"PART I=" << i
243 <<
" nb_uid=" << nb_uid
244 <<
" min_uid=" << written_unique_ids[i][0]
245 <<
" max_uid=" << written_unique_ids[i][nb_uid - 1];
247 Int64Array& full_written_unique_ids = reader->writtenUniqueIds();
248 for (Integer i = 0; i < nb_to_read; ++i)
249 full_written_unique_ids.
addRange(written_unique_ids[i]);
250 info(5) <<
"FULL UID SIZE=" << full_written_unique_ids.size();
254 m_parallel_data_readers.insert(std::make_pair(group_full_name, reader));
269 if (m_nb_rank_to_read==0)
271 if (m_parallel_mng->commRank()!=m_parallel_mng->masterIORank())
286 info(4) <<
"MASTER READ var=" <<
var->fullName() <<
" data=" << data;
287 if (
var->isPartial()) {
288 info() <<
"** WARNING: partial variable not implemented in BasicReaderWriter";
292 _directReadVal(
vmd.get(), data);
302 IData* data = infos.data();
303 info(4) <<
"MASTER2 READ var=" <<
vmd->fullName() <<
" data=" << data;
304 if (
vmd->isPartial()) {
305 info() <<
"** WARNING: partial variable not implemented in BasicReaderWriter";
308 _directReadVal(
vmd, data);
320 info(5) <<
" S=" << s <<
'\n';
330 Int32 rank = m_parallel_mng->commRank();
331 if (m_forced_rank_to_read >= 0)
332 rank = m_forced_rank_to_read;
333 if (m_version >= 3) {
336 info(4) <<
"Reading checkpoint metadata from database";
339 m_forced_rank_to_read_text_reader->read(
key_name, asWritableBytes(bytes.span()));
342 String filename = _getMetaDataFileName(rank);
343 info(4) <<
"Reading checkpoint metadata file=" << filename;
344 platform::readAllFile(filename,
false, bytes);
360 if (m_forced_rank_to_read >= 0) {
361 m_first_rank_to_read = m_forced_rank_to_read;
362 m_nb_rank_to_read = 1;
368 else if (nb_rank < m_nb_written_part) {
374 Int32 nb_part_per_rank = m_nb_written_part / nb_rank;
375 Int32 remaining_nb_part = m_nb_written_part - (nb_part_per_rank * nb_rank);
376 first_to_read = nb_part_per_rank * my_rank;
377 nb_to_read = nb_part_per_rank;
378 info(4) <<
"NB_PART_PER_RANK = " << nb_part_per_rank
379 <<
" REMAINING=" << remaining_nb_part;
380 if (my_rank >= remaining_nb_part) {
381 first_to_read += remaining_nb_part;
384 first_to_read += my_rank;
389 if (my_rank >= m_nb_written_part) {
395 m_first_rank_to_read = first_to_read;
396 m_nb_rank_to_read = nb_to_read;
402Ref<IGenericReader> BasicReader::
403_readOwnMetaDataAndCreateReader(
Int32 rank)
405 String main_filename = _getBasicVariableFile(m_version, m_path, rank);
406 Ref<KeyValueTextReader> text_reader;
407 if (m_version >= 3) {
410 if (rank == m_forced_rank_to_read)
411 text_reader = m_forced_rank_to_read_text_reader;
413 text_reader =
makeRef(
new KeyValueTextReader(traceMng(), main_filename, m_version));
416 text_reader->setDataCompressor(m_forced_rank_to_read_text_reader->dataCompressor());
417 text_reader->setHashAlgorithm(m_forced_rank_to_read_text_reader->hashAlgorithm());
421 auto r = makeRef<IGenericReader>(
new BasicGenericReader(m_application, m_version, text_reader));
422 r->initialize(m_path, rank);
430beginRead(
const VariableCollection& vars)
433 beginRead(DataReaderInfo());
442 ARCANE_UNUSED(infos);
443 info(4) <<
"** ** BEGIN READ";
446 info(4) <<
"RanksToRead: FIRST TO READ =" << m_first_rank_to_read
447 <<
" nb=" << m_nb_rank_to_read <<
" version=" << m_version;
448 m_global_readers.resize(m_nb_rank_to_read);
449 for (Integer i = 0; i < m_nb_rank_to_read; ++i)
450 m_global_readers[i] = _readOwnMetaDataAndCreateReader(i + m_first_rank_to_read);
Tableau d'items de types quelconques.
Informations de relecture des données.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual Int32 commRank() const =0
Rang de cette instance dans le communicateur.
virtual bool isMasterIO() const =0
true si l'instance est un gestionnaire maître des entrées/sorties.
virtual Int32 commSize() const =0
Nombre d'instance dans le communicateur.
virtual IIOMng * ioMng() const =0
Gestionnaire des entrées/sorties.
virtual Integer masterIORank() const =0
Rang de l'instance gérant les entrées/sorties (pour laquelle isMasterIO() est vrai)
Interface d'une variable.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Informations de relecture des données d'une variable.
Tableau associatif des données des variables.
Informations sur les données d'une variable.
Vue modifiable d'un tableau d'un type T.
void addRange(ConstReferenceType val, Int64 n)
Ajoute n élément de valeur val à la fin du tableau.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
Chaîne de caractères unicode.
Vecteur 1D de données avec sémantique par valeur (style STL).
Array< Int64 > Int64Array
Tableau dynamique à une dimension d'entiers 64 bits.
UniqueArray< Int64 > Int64UniqueArray
Tableau dynamique à une dimension d'entiers 64 bits.
ArrayView< Int32 > Int32ArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Int32 Integer
Type représentant un entier.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.