14#include "arcane/std/internal/BasicWriter.h"
16#include "arcane/utils/FatalErrorException.h"
17#include "arcane/utils/StringBuilder.h"
18#include "arcane/utils/PlatformUtils.h"
19#include "arcane/utils/JSONWriter.h"
20#include "arcane/utils/IDataCompressor.h"
21#include "arcane/utils/MemoryView.h"
22#include "arcane/utils/Ref.h"
23#include "arcane/utils/IHashAlgorithm.h"
25#include "arcane/core/IParallelMng.h"
26#include "arcane/core/ItemGroup.h"
27#include "arcane/core/IVariable.h"
28#include "arcane/core/IItemFamily.h"
29#include "arcane/core/IData.h"
30#include "arcane/core/internal/IVariableInternal.h"
32#include "arcane/std/internal/ParallelDataWriter.h"
44BasicWriter(IApplication* app, IParallelMng* pm,
const String& path,
45 eOpenMode open_mode,
Int32 version,
bool want_parallel)
47, m_want_parallel(want_parallel)
60 Int32 rank = m_parallel_mng->commRank();
61 if (m_open_mode == OpenModeTruncate && m_parallel_mng->isMasterIO())
62 platform::recursiveCreateDirectory(m_path);
63 m_parallel_mng->barrier();
64 String filename = _getBasicVariableFile(m_version, m_path, rank);
65 m_text_writer =
makeRef(
new KeyValueTextWriter(traceMng(), filename, m_version));
66 m_text_writer->setDataCompressor(m_data_compressor);
67 m_text_writer->setHashAlgorithm(m_hash_algorithm);
71 if (!m_data_compressor.get()) {
72 String data_compressor_name = platform::getEnvironmentVariable(
"ARCANE_DEFLATER");
73 if (!data_compressor_name.null()) {
74 data_compressor_name = data_compressor_name +
"DataCompressor";
75 auto bc = _createDeflater(m_application, data_compressor_name);
76 info() <<
"Use data_compressor from environment variable ARCANE_DEFLATER name=" << data_compressor_name;
77 m_data_compressor = bc;
78 m_text_writer->setDataCompressor(bc);
83 if (!m_hash_algorithm.get()) {
84 String hash_algorithm_name = platform::getEnvironmentVariable(
"ARCANE_HASHALGORITHM");
85 if (hash_algorithm_name.null())
86 hash_algorithm_name =
"SHA3_256";
88 info() <<
"Use hash algorithm from environment variable ARCANE_HASHALGORITHM name=" << hash_algorithm_name;
89 hash_algorithm_name = hash_algorithm_name +
"HashAlgorithm";
90 auto v = _createHashAlgorithm(m_application, hash_algorithm_name);
92 m_text_writer->setHashAlgorithm(v);
96 if (!m_compare_hash_algorithm.get()) {
97 String algo_name = platform::getEnvironmentVariable(
"ARCANE_COMPAREHASHALGORITHM");
98 if (!algo_name.empty()) {
99 info() <<
"Use global hash algorithm from environment variable ARCANE_COMPAREHASHALGORITHM name=" << algo_name;
100 algo_name = algo_name +
"HashAlgorithm";
101 auto v = _createHashAlgorithm(m_application, algo_name);
102 m_compare_hash_algorithm = v;
106 m_global_writer =
new BasicGenericWriter(m_application, m_version, m_text_writer);
107 if (m_verbose_level > 0)
108 info() <<
"** OPEN MODE = " << m_open_mode;
124Ref<ParallelDataWriter> BasicWriter::
125_getWriter(IVariable* var)
127 return m_parallel_data_writers.getOrCreateWriter(var->itemGroup());
134_directWriteVal(IVariable* var, IData* data)
136 info(4) <<
"DIRECT WRITE VAL v=" << var->fullName();
138 IData* write_data = data;
143 Ref<IData> allocated_write_data;
144 const bool is_mesh_variable = (var->itemKind() != IK_Unknown);
145 if (is_mesh_variable) {
146 ItemGroup group = var->itemGroup();
147 if (m_want_parallel) {
148 Ref<ParallelDataWriter> writer = _getWriter(var);
149 written_unique_ids = writer->sortedUniqueIds();
150 allocated_write_data = writer->getSortedValues(data);
151 write_data = allocated_write_data.get();
156 _fillUniqueIds(group, sequential_written_unique_ids);
157 written_unique_ids = sequential_written_unique_ids.view();
160 if (m_written_groups.find(group) == m_written_groups.end()) {
161 info(5) <<
"WRITE GROUP " << group.name();
162 const IItemFamily* item_family = group.itemFamily();
163 const String& gname = group.name();
164 String group_full_name = item_family->fullName() +
"_" + gname;
165 _fillUniqueIds(group, wanted_unique_ids);
166 if (m_is_save_values)
167 m_global_writer->writeItemGroup(group_full_name, written_unique_ids, wanted_unique_ids.view());
168 m_written_groups.insert(group);
172 Ref<ISerializedData> sdata(write_data->createSerializedDataRef(
false));
174 if (is_mesh_variable) {
175 compare_hash = _computeCompareHash(var, write_data);
177 m_global_writer->writeData(var->fullName(), sdata.get(), compare_hash, m_is_save_values);
209 info() <<
"** WARNING: partial variable not implemented in BasicWriter";
212 _directWriteVal(var, data);
219setMetaData(
const String& meta_data)
222 if (m_version >= 3) {
224 Int64 length = bytes.length();
225 String key_name =
"Global:CheckpointMetadata";
227 m_text_writer->write(key_name,
asBytes(bytes));
230 Int32 my_rank = m_parallel_mng->commRank();
231 String filename = _getMetaDataFileName(my_rank);
232 std::ofstream ofile(filename.
localstr(), ios::binary);
244 Int32 my_rank = m_parallel_mng->commRank();
245 m_global_writer->initialize(m_path, my_rank);
254 const Int64 nb_part = m_parallel_mng->commSize();
261 jsw.writeKey(_getArcaneDBTag());
264 jsw.write(
"Version", (
Int64)m_version);
265 jsw.write(
"NbPart", nb_part);
266 jsw.write(
"HasValues", m_is_save_values);
268 String data_compressor_name;
269 Int64 data_compressor_min_size = 0;
270 if (m_data_compressor.get()) {
271 data_compressor_name = m_data_compressor->name();
272 data_compressor_min_size = m_data_compressor->minCompressSize();
274 jsw.write(
"DataCompressor", data_compressor_name);
275 jsw.write(
"DataCompressorMinSize", String::fromNumber(data_compressor_min_size));
280 if (m_hash_algorithm.get())
281 name = m_hash_algorithm->name();
282 jsw.write(
"HashAlgorithm", name);
288 if (m_compare_hash_algorithm.get())
289 name = m_compare_hash_algorithm->name();
290 jsw.write(
"ComparisonHashAlgorithm", name);
295 StringBuilder filename = m_path;
296 filename +=
"/arcane_acr_db.json";
297 String fn = filename.toString();
298 std::ofstream ofile(fn.localstr());
299 ofile << jsw.getBuffer();
308 const IParallelMng* pm = m_parallel_mng;
309 if (pm->isMasterIO()) {
310 if (m_version >= 3) {
314 Int64 nb_part = pm->commSize();
315 StringBuilder filename = m_path;
316 filename +=
"/infos.txt";
317 String fn = filename.toString();
318 std::ofstream ofile(fn.localstr());
319 ofile << nb_part <<
'\n';
322 m_global_writer->endWrite();
#define ARCANE_FATAL(...)
Macro throwing a FatalErrorException.
Interface of a data item.
Interface of a hashing algorithm.
virtual String computeComparisonHashCollective(IHashAlgorithm *hash_algo, IData *sorted_data)=0
Calculates the comparison hash for the variable.
virtual bool isPartial() const =0
Indicates if the variable is partial.
virtual IVariableInternal * _internalApi()=0
Internal Arcane API.
View of an array of elements of type T.
Unicode character string.
void writeBytes(std::ostream &o) const
Writes the string in UTF-8 format to the stream o.
const char * localstr() const
Returns the conversion of the instance into UTF-8 encoding.
ByteConstArrayView utf8() const
Returns the conversion of the instance into UTF-8 encoding.
TraceMessage info() const
Flow for an information message.
UniqueArray< Int64 > Int64UniqueArray
Dynamic 1D array of 64-bit integers.
std::int64_t Int64
Signed integer type of 64 bits.
ConstArrayView< Int64 > Int64ConstArrayView
C equivalent of a 1D array of 64-bit integers.
Impl::SpanTypeFromSize< conststd::byte, SizeType >::SpanType asBytes(const SpanImpl< DataType, SizeType, Extent > &s)
Converts the view into an array of non-modifiable bytes.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Creates a reference on a pointer.
std::int32_t Int32
Signed integer type of 32 bits.