14#include "arcane/std/internal/BasicWriter.h"
16#include "arcane/utils/FatalErrorException.h"
17#include "arcane/utils/StringBuilder.h"
18#include "arcane/utils/PlatformUtils.h"
19#include "arcane/utils/JSONWriter.h"
20#include "arcane/utils/IDataCompressor.h"
21#include "arcane/utils/MemoryView.h"
22#include "arcane/utils/Ref.h"
23#include "arcane/utils/IHashAlgorithm.h"
25#include "arcane/core/IParallelMng.h"
26#include "arcane/core/ItemGroup.h"
27#include "arcane/core/IVariable.h"
28#include "arcane/core/IItemFamily.h"
29#include "arcane/core/IData.h"
30#include "arcane/core/internal/IVariableInternal.h"
32#include "arcane/std/internal/ParallelDataWriter.h"
44BasicWriter(IApplication* app, IParallelMng* pm,
const String& path,
45 eOpenMode open_mode,
Int32 version,
bool want_parallel)
47, m_want_parallel(want_parallel)
60 Int32 rank = m_parallel_mng->commRank();
61 if (m_open_mode == OpenModeTruncate && m_parallel_mng->isMasterIO())
62 platform::recursiveCreateDirectory(m_path);
63 m_parallel_mng->barrier();
64 String filename = _getBasicVariableFile(m_version, m_path, rank);
65 m_text_writer =
makeRef(
new KeyValueTextWriter(traceMng(), filename, m_version));
66 m_text_writer->setDataCompressor(m_data_compressor);
67 m_text_writer->setHashAlgorithm(m_hash_algorithm);
71 if (!m_data_compressor.get()) {
72 String data_compressor_name = platform::getEnvironmentVariable(
"ARCANE_DEFLATER");
73 if (!data_compressor_name.null()) {
74 data_compressor_name = data_compressor_name +
"DataCompressor";
75 auto bc = _createDeflater(m_application, data_compressor_name);
76 info() <<
"Use data_compressor from environment variable ARCANE_DEFLATER name=" << data_compressor_name;
77 m_data_compressor = bc;
78 m_text_writer->setDataCompressor(bc);
83 if (!m_hash_algorithm.get()) {
84 String hash_algorithm_name = platform::getEnvironmentVariable(
"ARCANE_HASHALGORITHM");
85 if (hash_algorithm_name.null())
86 hash_algorithm_name =
"SHA3_256";
88 info() <<
"Use hash algorithm from environment variable ARCANE_HASHALGORITHM name=" << hash_algorithm_name;
89 hash_algorithm_name = hash_algorithm_name +
"HashAlgorithm";
90 auto v = _createHashAlgorithm(m_application, hash_algorithm_name);
92 m_text_writer->setHashAlgorithm(v);
96 if (!m_compare_hash_algorithm.get()) {
97 String algo_name = platform::getEnvironmentVariable(
"ARCANE_COMPAREHASHALGORITHM");
98 if (!algo_name.empty()) {
99 info() <<
"Use global hash algorithm from environment variable ARCANE_COMPAREHASHALGORITHM name=" << algo_name;
100 algo_name = algo_name +
"HashAlgorithm";
101 auto v = _createHashAlgorithm(m_application, algo_name);
102 m_compare_hash_algorithm = v;
106 m_global_writer =
new BasicGenericWriter(m_application, m_version, m_text_writer);
107 if (m_verbose_level > 0)
108 info() <<
"** OPEN MODE = " << m_open_mode;
124Ref<ParallelDataWriter> BasicWriter::
125_getWriter(IVariable* var)
127 return m_parallel_data_writers.getOrCreateWriter(var->itemGroup());
134_directWriteVal(IVariable* var, IData* data)
136 info(4) <<
"DIRECT WRITE VAL v=" << var->fullName();
138 IData* write_data = data;
143 Ref<IData> allocated_write_data;
144 const bool is_mesh_variable = (var->itemKind() != IK_Unknown);
145 if (is_mesh_variable) {
146 ItemGroup group = var->itemGroup();
147 if (m_want_parallel) {
148 Ref<ParallelDataWriter> writer = _getWriter(var);
149 written_unique_ids = writer->sortedUniqueIds();
150 allocated_write_data = writer->getSortedValues(data);
151 write_data = allocated_write_data.get();
156 _fillUniqueIds(group, sequential_written_unique_ids);
157 written_unique_ids = sequential_written_unique_ids.view();
160 if (m_written_groups.find(group) == m_written_groups.end()) {
161 info(5) <<
"WRITE GROUP " << group.name();
162 const IItemFamily* item_family = group.itemFamily();
163 const String& gname = group.name();
164 String group_full_name = item_family->fullName() +
"_" + gname;
165 _fillUniqueIds(group, wanted_unique_ids);
166 if (m_is_save_values)
167 m_global_writer->writeItemGroup(group_full_name, written_unique_ids, wanted_unique_ids.view());
168 m_written_groups.insert(group);
172 Ref<ISerializedData> sdata(write_data->createSerializedDataRef(
false));
174 if (is_mesh_variable) {
175 compare_hash = _computeCompareHash(var, write_data);
177 m_global_writer->writeData(var->fullName(), sdata.get(), compare_hash, m_is_save_values);
210 info() <<
"** WARNING: partial variable not implemented in BasicWriter";
213 _directWriteVal(var, data);
220setMetaData(
const String& meta_data)
224 if (m_version >= 3) {
227 String key_name =
"Global:CheckpointMetadata";
229 m_text_writer->write(key_name,
asBytes(bytes));
232 Int32 my_rank = m_parallel_mng->commRank();
233 String filename = _getMetaDataFileName(my_rank);
234 std::ofstream ofile(filename.
localstr());
246 Int32 my_rank = m_parallel_mng->commRank();
247 m_global_writer->initialize(m_path, my_rank);
256 const Int64 nb_part = m_parallel_mng->commSize();
263 jsw.writeKey(_getArcaneDBTag());
266 jsw.write(
"Version", (
Int64)m_version);
267 jsw.write(
"NbPart", nb_part);
268 jsw.write(
"HasValues", m_is_save_values);
270 String data_compressor_name;
271 Int64 data_compressor_min_size = 0;
272 if (m_data_compressor.get()) {
273 data_compressor_name = m_data_compressor->name();
274 data_compressor_min_size = m_data_compressor->minCompressSize();
276 jsw.write(
"DataCompressor", data_compressor_name);
277 jsw.write(
"DataCompressorMinSize", String::fromNumber(data_compressor_min_size));
282 if (m_hash_algorithm.get())
283 name = m_hash_algorithm->name();
284 jsw.write(
"HashAlgorithm", name);
290 if (m_compare_hash_algorithm.get())
291 name = m_compare_hash_algorithm->name();
292 jsw.write(
"ComparisonHashAlgorithm", name);
297 StringBuilder filename = m_path;
298 filename +=
"/arcane_acr_db.json";
299 String fn = filename.toString();
300 std::ofstream ofile(fn.localstr());
301 ofile << jsw.getBuffer();
310 const IParallelMng* pm = m_parallel_mng;
311 if (pm->isMasterIO()) {
312 if (m_version >= 3) {
316 Int64 nb_part = pm->commSize();
317 StringBuilder filename = m_path;
318 filename +=
"/infos.txt";
319 String fn = filename.toString();
320 std::ofstream ofile(fn.localstr());
321 ofile << nb_part <<
'\n';
324 m_global_writer->endWrite();
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Interface d'un algorithme de hashage.
virtual String computeComparisonHashCollective(IHashAlgorithm *hash_algo, IData *sorted_data)=0
Calcule de Hash de comparaison pour la variable.
Interface d'une variable.
virtual bool isPartial() const =0
Indique si la variable est partielle.
virtual IVariableInternal * _internalApi()=0
API interne à Arcane.
constexpr __host__ __device__ SizeType length() const noexcept
Nombre d'éléments du tableau.
Vue d'un tableau d'éléments de type T.
Chaîne de caractères unicode.
void writeBytes(std::ostream &o) const
Écrit la chaîne au format UTF-8 sur le flot o.
const char * localstr() const
Retourne la conversion de l'instance dans l'encodage UTF-8.
ByteConstArrayView utf8() const
Retourne la conversion de l'instance dans l'encodage UTF-8.
TraceMessage info() const
Flot pour un message d'information.
UniqueArray< Int64 > Int64UniqueArray
Tableau dynamique à une dimension d'entiers 64 bits.
std::int64_t Int64
Type entier signé sur 64 bits.
ConstArrayView< Int64 > Int64ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 64 bits.
impl::SpanTypeFromSize< conststd::byte, SizeType >::SpanType asBytes(const SpanImpl< DataType, SizeType, Extent > &s)
Converti la vue en un tableau d'octets non modifiables.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int32_t Int32
Type entier signé sur 32 bits.