14#include "arcane/std/internal/BasicReader.h"
16#include "arcane/utils/PlatformUtils.h"
17#include "arcane/utils/JSONReader.h"
18#include "arcane/utils/Ref.h"
20#include "arcane/core/IParallelMng.h"
21#include "arcane/core/IIOMng.h"
22#include "arcane/core/IData.h"
23#include "arcane/core/ItemGroup.h"
24#include "arcane/core/IVariable.h"
25#include "arcane/core/SerializeBuffer.h"
27#include "arcane/std/internal/ParallelDataReader.h"
39BasicReader(IApplication* app, IParallelMng* pm,
Int32 forced_rank_to_read,
40 const String& path,
bool want_parallel)
42, m_want_parallel(want_parallel)
45, m_first_rank_to_read(0)
47, m_forced_rank_to_read(forced_rank_to_read)
48, m_item_group_finder(nullptr)
58 info() <<
"BasicReader::initialize()";
60 IParallelMng* pm = m_parallel_mng;
65 String db_filename = String::concat(m_path,
"/arcane_acr_db.json");
66 Int32 has_db_file = 0;
69 has_db_file = platform::isFileReadable(db_filename) ? 1 : 0;
70 pm->broadcast(
Int32ArrayView(1, &has_db_file), pm->masterIORank());
72 String data_compressor_name;
73 String hash_algorithm_name;
74 String comparison_hash_algorithm_name;
76 UniqueArray<Byte> bytes;
77 pm->ioMng()->collectiveRead(db_filename, bytes,
false);
78 JSONDocument json_doc;
79 json_doc.parse(bytes, db_filename);
80 JSONValue root = json_doc.root();
81 JSONValue jv_arcane_db = root.expectedChild(_getArcaneDBTag());
82 m_version = jv_arcane_db.expectedChild(
"Version").valueAsInt32();
83 m_nb_written_part = jv_arcane_db.expectedChild(
"NbPart").valueAsInt32();
84 data_compressor_name = jv_arcane_db.child(
"DataCompressor").value();
85 hash_algorithm_name = jv_arcane_db.child(
"HashAlgorithm").value();
86 comparison_hash_algorithm_name = jv_arcane_db.child(
"ComparisonHashAlgorithm").value();
87 info() <<
"**--** Begin read using database version=" << m_version
88 <<
" nb_part=" << m_nb_written_part
89 <<
" compressor=" << data_compressor_name
90 <<
" hash_algorithm=" << hash_algorithm_name
91 <<
" comparison_hash_algorithm=" << comparison_hash_algorithm_name;
98 if (pm->isMasterIO()) {
100 String filename = String::concat(m_path,
"/infos.txt");
101 std::ifstream ifile(filename.localstr());
103 info(4) <<
"** NB PART=" << nb_part;
104 m_nb_written_part = nb_part;
106 pm->broadcast(
Int32ArrayView(1, &m_nb_written_part), pm->masterIORank());
108 if (m_version >= 3) {
109 Int32 rank_to_read = m_forced_rank_to_read;
110 if (rank_to_read < 0) {
111 if (m_nb_written_part > 1)
112 rank_to_read = pm->commRank();
116 String main_filename = _getBasicVariableFile(m_version, m_path, rank_to_read);
117 m_forced_rank_to_read_text_reader =
makeRef(
new KeyValueTextReader(traceMng(), main_filename, m_version));
118 if (!data_compressor_name.empty()) {
119 Ref<IDataCompressor> dc = _createDeflater(m_application, data_compressor_name);
120 m_forced_rank_to_read_text_reader->setDataCompressor(dc);
122 if (!hash_algorithm_name.empty()) {
123 Ref<IHashAlgorithm> v = _createHashAlgorithm(m_application, hash_algorithm_name);
124 m_forced_rank_to_read_text_reader->setHashAlgorithm(v);
126 if (!comparison_hash_algorithm_name.empty()) {
127 Ref<IHashAlgorithm> v = _createHashAlgorithm(m_application, comparison_hash_algorithm_name);
128 m_comparison_hash_algorithm = v;
137_directReadVal(VariableMetaData* varmd, IData* data)
139 info(4) <<
"DIRECT READ VAL v=" << varmd->fullName();
141 bool is_item_variable = !varmd->itemFamilyName().null();
142 Int32 nb_rank_to_read = m_nb_rank_to_read;
146 if (!is_item_variable)
147 if (nb_rank_to_read > 1)
150 UniqueArray<Ref<IData>> allocated_data;
151 UniqueArray<IData*> written_data(nb_rank_to_read);
153 for (Integer i = 0; i < nb_rank_to_read; ++i) {
154 written_data[i] = data;
155 if ((nb_rank_to_read > 1 || m_want_parallel) && is_item_variable) {
156 Ref<IData> new_data = data->cloneEmptyRef();
157 written_data[i] = new_data.get();
158 allocated_data.add(new_data);
160 String vname = varmd->fullName();
161 info(4) <<
" TRY TO READ var_full_name=" << vname;
162 m_global_readers[i]->readData(vname, written_data[i]);
163 if (i==0 && m_comparison_hash_algorithm.get() )
164 info(5) <<
"COMPARISON_HASH =" << m_global_readers[i]->comparisonHashValue(vname);
167 if (is_item_variable) {
168 Ref<ParallelDataReader> parallel_data_reader = _getReader(varmd);
171 IData* full_written_data =
nullptr;
173 if (nb_rank_to_read == 0) {
177 full_written_data =
nullptr;
179 else if (nb_rank_to_read == 1) {
181 full_written_data = written_data[0];
185 Ref<IData> allocated_written_data = data->cloneEmptyRef();
186 allocated_data.add(allocated_written_data);
187 full_written_data = allocated_written_data.get();
188 SerializeBuffer sbuf;
189 sbuf.setMode(ISerializer::ModeReserve);
190 for (Int32 i = 0; i < nb_rank_to_read; ++i)
191 written_data[i]->serialize(&sbuf,
nullptr);
192 sbuf.allocateBuffer();
193 sbuf.setMode(ISerializer::ModePut);
194 for (Int32 i = 0; i < nb_rank_to_read; ++i)
195 written_data[i]->serialize(&sbuf,
nullptr);
196 sbuf.setMode(ISerializer::ModeGet);
197 sbuf.setReadMode(ISerializer::ReadAdd);
198 for (Int32 i = 0; i < nb_rank_to_read; ++i)
199 full_written_data->serialize(&sbuf,
nullptr);
201 if (data != full_written_data) {
202 info(5) <<
"PARALLEL READ";
203 parallel_data_reader->getSortedValues(full_written_data, data);
211Ref<ParallelDataReader> BasicReader::
212_getReader(VariableMetaData* varmd)
214 Int32 nb_to_read = m_nb_rank_to_read;
219 const String& var_group_name = varmd->itemGroupName();
220 String group_full_name = varmd->meshName() +
"_" + varmd->itemFamilyName() +
"_" + var_group_name;
221 auto ix = m_parallel_data_readers.find(group_full_name);
222 if (ix != m_parallel_data_readers.end())
225 IParallelMng* pm = m_parallel_mng;
226 Ref<ParallelDataReader> reader =
makeRef(
new ParallelDataReader(pm));
228 UniqueArray<SharedArray<Int64>> written_unique_ids(nb_to_read);
229 Int64Array& wanted_unique_ids = reader->wantedUniqueIds();
230 for (Integer i = 0; i < nb_to_read; ++i) {
231 m_global_readers[i]->readItemGroup(group_full_name, written_unique_ids[i], wanted_unique_ids);
235 if (m_item_group_finder) {
236 ItemGroup ig = m_item_group_finder->getWantedGroup(varmd);
237 _fillUniqueIds(ig, wanted_unique_ids);
239 for (Integer i = 0; i < nb_to_read; ++i) {
240 Integer nb_uid = written_unique_ids[i].size();
242 info(5) <<
"PART I=" << i
243 <<
" nb_uid=" << nb_uid
244 <<
" min_uid=" << written_unique_ids[i][0]
245 <<
" max_uid=" << written_unique_ids[i][nb_uid - 1];
247 Int64Array& full_written_unique_ids = reader->writtenUniqueIds();
248 for (Integer i = 0; i < nb_to_read; ++i)
249 full_written_unique_ids.
addRange(written_unique_ids[i]);
250 info(5) <<
"FULL UID SIZE=" << full_written_unique_ids.size();
254 m_parallel_data_readers.insert(std::make_pair(group_full_name, reader));
266fillComparisonHash(std::map<String, String>& comparison_hash_map)
268 comparison_hash_map.clear();
269 if (m_nb_rank_to_read==0)
271 if (m_parallel_mng->commRank()!=m_parallel_mng->masterIORank())
274 for(
auto v : var_map){
276 comparison_hash_map.try_emplace(vd->fullName(),vd->comparisonHashValue());
286 info(4) <<
"MASTER READ var=" << var->
fullName() <<
" data=" << data;
288 info() <<
"** WARNING: partial variable not implemented in BasicReaderWriter";
292 _directReadVal(vmd.
get(), data);
302 IData* data = infos.data();
303 info(4) <<
"MASTER2 READ var=" << vmd->
fullName() <<
" data=" << data;
304 if (vmd->isPartial()) {
305 info() <<
"** WARNING: partial variable not implemented in BasicReaderWriter";
308 _directReadVal(vmd, data);
320 info(5) <<
" S=" << s <<
'\n';
330 Int32 rank = m_parallel_mng->commRank();
331 if (m_forced_rank_to_read >= 0)
332 rank = m_forced_rank_to_read;
333 if (m_version >= 3) {
334 Int64 meta_data_size = 0;
335 String key_name =
"Global:CheckpointMetadata";
336 info(4) <<
"Reading checkpoint metadata from database";
338 bytes.
resize(meta_data_size);
342 String filename = _getMetaDataFileName(rank);
343 info(4) <<
"Reading checkpoint metadata file=" << filename;
357 Int32 first_to_read = my_rank;
358 Int32 nb_to_read = 1;
360 if (m_forced_rank_to_read >= 0) {
361 m_first_rank_to_read = m_forced_rank_to_read;
362 m_nb_rank_to_read = 1;
366 nb_to_read = m_nb_written_part;
368 else if (nb_rank < m_nb_written_part) {
374 Int32 nb_part_per_rank = m_nb_written_part / nb_rank;
375 Int32 remaining_nb_part = m_nb_written_part - (nb_part_per_rank * nb_rank);
376 first_to_read = nb_part_per_rank * my_rank;
377 nb_to_read = nb_part_per_rank;
378 info(4) <<
"NB_PART_PER_RANK = " << nb_part_per_rank
379 <<
" REMAINING=" << remaining_nb_part;
380 if (my_rank >= remaining_nb_part) {
381 first_to_read += remaining_nb_part;
384 first_to_read += my_rank;
389 if (my_rank >= m_nb_written_part) {
395 m_first_rank_to_read = first_to_read;
396 m_nb_rank_to_read = nb_to_read;
402Ref<IGenericReader> BasicReader::
403_readOwnMetaDataAndCreateReader(Int32 rank)
405 String main_filename = _getBasicVariableFile(m_version, m_path, rank);
406 Ref<KeyValueTextReader> text_reader;
407 if (m_version >= 3) {
410 if (rank == m_forced_rank_to_read)
411 text_reader = m_forced_rank_to_read_text_reader;
413 text_reader =
makeRef(
new KeyValueTextReader(traceMng(), main_filename, m_version));
416 text_reader->setDataCompressor(m_forced_rank_to_read_text_reader->dataCompressor());
417 text_reader->setHashAlgorithm(m_forced_rank_to_read_text_reader->hashAlgorithm());
421 auto r = makeRef<IGenericReader>(
new BasicGenericReader(m_application, m_version, text_reader));
422 r->initialize(m_path, rank);
430beginRead(
const VariableCollection& vars)
433 beginRead(DataReaderInfo());
442 ARCANE_UNUSED(infos);
443 info(4) <<
"** ** BEGIN READ";
446 info(4) <<
"RanksToRead: FIRST TO READ =" << m_first_rank_to_read
447 <<
" nb=" << m_nb_rank_to_read <<
" version=" << m_version;
448 m_global_readers.resize(m_nb_rank_to_read);
449 for (
Integer i = 0; i < m_nb_rank_to_read; ++i)
450 m_global_readers[i] = _readOwnMetaDataAndCreateReader(i + m_first_rank_to_read);
void resize(Int64 s)
Change le nombre d'éléments du tableau à s.
void addRange(ConstReferenceType val, Int64 n)
Ajoute n élément de valeur val à la fin du tableau.
Span< const T > span() const
Vue immutable sur ce tableau.
Informations de relecture des données.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual Int32 commRank() const =0
Rang de cette instance dans le communicateur.
virtual Int32 commSize() const =0
Nombre d'instance dans le communicateur.
Interface d'une variable.
virtual String fullName() const =0
Nom complet de la variable (avec le préfixe de la famille)
virtual Ref< VariableMetaData > createMetaDataRef() const =0
Créé une instance contenant les meta-données de la variable.
virtual bool isPartial() const =0
Indique si la variable est partielle.
Référence à une instance.
InstanceType * get() const
Instance associée ou nullptr si aucune.
Chaîne de caractères unicode.
TraceMessage info() const
Flot pour un message d'information.
Informations de relecture des données d'une variable.
Ref< KeyValueTextReader > m_forced_rank_to_read_text_reader
Lecteur pour le premier rang à lire.
void fillMetaData(ByteArray &bytes) override
Remplit bytes avec le contenu des méta-données.
Tableau associatif des données des variables.
Informations sur les données d'une variable.
Array< Int64 > Int64Array
Tableau dynamique à une dimension d'entiers 64 bits.
ArrayView< Int64 > Int64ArrayView
Equivalent C d'un tableau à une dimension d'entiers 64 bits.
impl::SpanTypeFromSize< std::byte, SizeType >::SpanType asWritableBytes(const SpanImpl< DataType, SizeType, Extent > &s)
Converti la vue en un tableau d'octets modifiables.
UniqueArray< Int64 > Int64UniqueArray
Tableau dynamique à une dimension d'entiers 64 bits.
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
Array< Byte > ByteArray
Tableau dynamique à une dimension de caractères.
UniqueArray< Byte > ByteUniqueArray
Tableau dynamique à une dimension de caractères.
ArrayView< Int32 > Int32ArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int32_t Int32
Type entier signé sur 32 bits.