Arcane  4.1.12.0
Developer documentation
Loading...
Searching...
No Matches
ParallelDataReader.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2026 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* ParallelDataReader.cc (C) 2000-2024 */
9/* */
10/* Parallel IData Reader. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/std/internal/ParallelDataReader.h"
15
16#include "arcane/utils/ScopedPtr.h"
17#include "arcane/utils/FatalErrorException.h"
18#include "arcane/utils/FixedArray.h"
19#include "arcane/utils/CheckedConvert.h"
20
21#include "arcane/core/IParallelMng.h"
22#include "arcane/core/IParallelExchanger.h"
23#include "arcane/core/ISerializer.h"
24#include "arcane/core/ISerializeMessage.h"
25#include "arcane/core/SerializeBuffer.h"
26#include "arcane/core/IData.h"
27#include "arcane/core/parallel/BitonicSortT.H"
28#include "arcane/core/ParallelMngUtils.h"
29
30/*---------------------------------------------------------------------------*/
31/*---------------------------------------------------------------------------*/
32
33namespace Arcane
34{
35
36/*---------------------------------------------------------------------------*/
37/*---------------------------------------------------------------------------*/
38
56: public TraceAccessor
57{
58 public:
59
60 explicit Impl(IParallelMng* pm);
61
62 public:
63
64 Int64Array& writtenUniqueIds() { return m_written_unique_ids; }
65 Int64Array& wantedUniqueIds() { return m_wanted_unique_ids; }
66
67 private:
68
69 IParallelMng* m_parallel_mng = nullptr;
70
71 Int32UniqueArray m_data_to_send_ranks;
72 //TODO do not use an array sized to commSize()
73 UniqueArray<SharedArray<Int32>> m_data_to_send_local_indexes;
74 UniqueArray<SharedArray<Int32>> m_data_to_recv_indexes;
75 Int64UniqueArray m_written_unique_ids;
76 Int64UniqueArray m_wanted_unique_ids;
77 Int32UniqueArray m_local_send_indexes;
78
79 public:
80
81 void sort();
82
83 public:
84
85 void getSortedValues(IData* written_data, IData* data);
86
87 private:
88
89 void _searchUniqueIdIndexes(Int64ConstArrayView recv_uids,
90 Int64ConstArrayView written_unique_ids,
91 Int32Array& indexes) const;
92};
93
94/*---------------------------------------------------------------------------*/
95/*---------------------------------------------------------------------------*/
96
97/*---------------------------------------------------------------------------*/
98/*---------------------------------------------------------------------------*/
99
100ParallelDataReader::
101ParallelDataReader(IParallelMng* pm)
102: m_p(new Impl(pm))
103{
104}
105
106ParallelDataReader::
107~ParallelDataReader()
108{
109 delete m_p;
110}
111
112Array<Int64>& ParallelDataReader::
113writtenUniqueIds()
114{
115 return m_p->writtenUniqueIds();
116}
117Array<Int64>& ParallelDataReader::
118wantedUniqueIds()
119{
120 return m_p->wantedUniqueIds();
121}
122void ParallelDataReader::
123sort()
124{
125 return m_p->sort();
126}
127void ParallelDataReader::
128getSortedValues(IData* written_data, IData* data)
129{
130 m_p->getSortedValues(written_data, data);
131}
132
133/*---------------------------------------------------------------------------*/
134/*---------------------------------------------------------------------------*/
135
136ParallelDataReader::Impl::
137Impl(IParallelMng* pm)
138: TraceAccessor(pm->traceMng())
139, m_parallel_mng(pm)
140{
141}
142
143/*---------------------------------------------------------------------------*/
144/*---------------------------------------------------------------------------*/
145
146void ParallelDataReader::Impl::
147sort()
148{
149 Int32 nb_wanted_uid = m_wanted_unique_ids.size();
150
151 Int32 nb_rank = m_parallel_mng->commSize();
152 Int32 my_rank = m_parallel_mng->commRank();
153
154 Int64UniqueArray global_min_max_uid(nb_rank * 2);
155 {
156 FixedArray<Int64, 2> min_max_written_uid;
157 min_max_written_uid[0] = NULL_ITEM_UNIQUE_ID;
158 min_max_written_uid[1] = NULL_ITEM_UNIQUE_ID;
159 Integer nb_written_uid = m_written_unique_ids.size();
160
161 if (nb_written_uid != 0) {
162 // The written uids are sorted in ascending order.
163 // The smallest is therefore the first and the largest the last
164 min_max_written_uid[0] = m_written_unique_ids[0];
165 min_max_written_uid[1] = m_written_unique_ids[nb_written_uid - 1];
166 }
167 m_parallel_mng->allGather(min_max_written_uid.view(), global_min_max_uid);
168 }
169 for (Integer irank = 0; irank < nb_rank; ++irank)
170 info(5) << "MIN_MAX_UIDS p=" << irank << " min=" << global_min_max_uid[irank * 2]
171 << " max=" << global_min_max_uid[(irank * 2) + 1];
172
173 m_data_to_recv_indexes.resize(nb_rank);
174 {
175 UniqueArray<SharedArray<Int64>> uids_list(nb_rank);
176 auto exchanger{ ParallelMngUtils::createExchangerRef(m_parallel_mng) };
177 for (Integer i = 0; i < nb_wanted_uid; ++i) {
178 Int64 uid = m_wanted_unique_ids[i];
179 Int32 rank = -1;
180 //TODO: use a dichotomy
181 for (Int32 irank = 0; irank < nb_rank; ++irank) {
182 if (uid >= global_min_max_uid[irank * 2] && uid <= global_min_max_uid[(irank * 2) + 1]) {
183 rank = irank;
184 break;
185 }
186 }
187 if (rank == (-1))
188 ARCANE_FATAL("Bad rank uid={0} uid_index={1}", uid, i);
189
190 // It is unnecessary to send the values
191 if (rank != my_rank) {
192 if (uids_list[rank].empty()) {
193 exchanger->addSender(rank);
194 }
195 uids_list[rank].add(uid);
196 }
197 m_data_to_recv_indexes[rank].add(i);
198 }
199 exchanger->initializeCommunicationsMessages();
200 //info() << "NB SEND=" << exchanger->nbSender()
201 // << " NB_RECV=" << exchanger->nbReceiver();
202 Int32ConstArrayView senders = exchanger->senderRanks();
203 Integer nb_send = senders.size();
204 for (Integer i = 0; i < nb_send; ++i) {
205 //info() << "READ SEND TO A: rank=" << senders[i];
206 ISerializeMessage* send_msg = exchanger->messageToSend(i);
207 Int32 dest_rank = senders[i];
208 ISerializer* serializer = send_msg->serializer();
209 serializer->setMode(ISerializer::ModeReserve);
210 serializer->reserveArray(uids_list[dest_rank]);
211 serializer->allocateBuffer();
212 serializer->setMode(ISerializer::ModePut);
213 serializer->putArray(uids_list[dest_rank]);
214#if 0
215 for( Integer z=0; z<nb_to_send; ++z ){
216 Integer index = indexes_list[dest_rank][z];
217 info() << " SEND Z=" << z << " RANK=" << dest_rank << " index=" << index
218 << " own_index=" << indexes_to_recv[i][z];
219 }
220#endif
221 }
222 exchanger->processExchange();
223 Int32ConstArrayView receivers = exchanger->receiverRanks();
224 Integer nb_recv = receivers.size();
225
226 m_data_to_send_local_indexes.resize(nb_recv);
227 m_data_to_send_ranks.resize(nb_recv);
228
229 for (Integer i = 0; i < nb_recv; ++i) {
230 //info() << "READ RECEIVE FROM A: rank=" << receivers[i];
231 ISerializeMessage* recv_msg = exchanger->messageToReceive(i);
232 Int32 orig_rank = receivers[i];
233 m_data_to_send_ranks[i] = orig_rank;
234 ISerializer* serializer = recv_msg->serializer();
235 serializer->setMode(ISerializer::ModeGet);
236 Int64UniqueArray recv_uids;
237 serializer->getArray(recv_uids);
238 Int64 nb_to_recv = recv_uids.largeSize();
239
240 m_data_to_send_local_indexes[i].resize(nb_to_recv);
241 _searchUniqueIdIndexes(recv_uids, m_written_unique_ids, m_data_to_send_local_indexes[i]);
242 }
243 }
244
245 // Processes the data that is already present on this processor.
246 {
247 Int32Array& local_recv_indexes = m_data_to_recv_indexes[my_rank];
248 Integer nb_local_index = local_recv_indexes.size();
249 if (nb_local_index > 0) {
250 m_local_send_indexes.resize(nb_local_index);
251 Int64UniqueArray uids(nb_local_index);
252 for (Integer i = 0; i < nb_local_index; ++i) {
253 uids[i] = m_wanted_unique_ids[local_recv_indexes[i]];
254 }
255 _searchUniqueIdIndexes(uids, m_written_unique_ids, m_local_send_indexes);
256 }
257 }
258}
259
260/*---------------------------------------------------------------------------*/
261/*---------------------------------------------------------------------------*/
262
263void ParallelDataReader::Impl::
264getSortedValues(IData* written_data, IData* data)
265{
266 auto exchanger{ ParallelMngUtils::createExchangerRef(m_parallel_mng) };
267 Integer nb_send = m_data_to_send_ranks.size();
268 for (Integer i = 0; i < nb_send; ++i) {
269 exchanger->addSender(m_data_to_send_ranks[i]);
270 }
271 //TODO use version without allGather() since we know the receivers
272 exchanger->initializeCommunicationsMessages();
273
274 for (Integer i = 0; i < nb_send; ++i) {
275 ISerializeMessage* send_msg = exchanger->messageToSend(i);
276 //info() << " SEND TO B: rank=" << send_msg->destSubDomain();
277 //Int32 dest_rank = send_sd[i];
278 ISerializer* serializer = send_msg->serializer();
279 serializer->setMode(ISerializer::ModeReserve);
280 if (written_data)
281 written_data->serialize(serializer, m_data_to_send_local_indexes[i], 0);
282 serializer->allocateBuffer();
283 serializer->setMode(ISerializer::ModePut);
284 if (written_data)
285 written_data->serialize(serializer, m_data_to_send_local_indexes[i], 0);
286 }
287 exchanger->processExchange();
288
289 Integer nb_wanted_uid = m_wanted_unique_ids.size();
290 data->resize(nb_wanted_uid);
291
292 // Processes the data that is already present on this processor.
293 {
294 Integer my_rank = m_parallel_mng->commRank();
295 ConstArrayView<Int32> local_recv_indexes = m_data_to_recv_indexes[my_rank];
296 Integer nb_local_index = local_recv_indexes.size();
297 if (nb_local_index > 0) {
298 //info() << "SERIALIZE RESERVE";
299 SerializeBuffer sbuf;
300 sbuf.setMode(ISerializer::ModeReserve);
301 if (written_data)
302 written_data->serialize(&sbuf, m_local_send_indexes, 0);
303 sbuf.allocateBuffer();
304 //info() << "SERIALIZE PUT";
305 sbuf.setMode(ISerializer::ModePut);
306 if (written_data)
307 written_data->serialize(&sbuf, m_local_send_indexes, 0);
308 //info() << "SERIALIZE GET";
309 sbuf.setMode(ISerializer::ModeGet);
310 data->serialize(&sbuf, local_recv_indexes, 0);
311 }
312 }
313
314 Int32ConstArrayView receivers = exchanger->receiverRanks();
315 Integer nb_recv = receivers.size();
316 for (Integer i = 0; i < nb_recv; ++i) {
317 ISerializeMessage* recv_msg = exchanger->messageToReceive(i);
318 Int32 orig_rank = receivers[i];
319 //info() << " RECEIVE FROM B: rank=" << orig_rank;
320 ISerializer* serializer = recv_msg->serializer();
321 serializer->setMode(ISerializer::ModeGet);
322 data->serialize(serializer, m_data_to_recv_indexes[orig_rank], 0);
323 }
324}
325
326/*---------------------------------------------------------------------------*/
327/*---------------------------------------------------------------------------*/
328
329void ParallelDataReader::Impl::
330_searchUniqueIdIndexes(Int64ConstArrayView recv_uids,
331 Int64ConstArrayView written_unique_ids,
332 Array<Int32>& indexes) const
333{
334 Integer nb_to_recv = recv_uids.size();
335 Integer nb_written_uid = written_unique_ids.size();
336
337 for (Integer irecv = 0; irecv < nb_to_recv; ++irecv) {
338 Int64 my_uid = recv_uids[irecv];
339 // Since written_unique_ids are sorted, we can use a dichotomy
340 auto iter_end = written_unique_ids.end();
341 auto iter_begin = written_unique_ids.begin();
342 auto x2 = std::lower_bound(iter_begin, iter_end, my_uid);
343 if (x2 == iter_end)
344 ARCANE_FATAL("Can not find uid uid={0} (with binary_search)", my_uid);
345 Int32 my_index = CheckedConvert::toInt32(x2 - iter_begin);
346
347 // Test if the dichotomy is correct
348 if (written_unique_ids[my_index] != my_uid)
349 ARCANE_FATAL("INTERNAL: bad index for bissection "
350 "Index={0} uid={1} wuid={2} n={3}",
351 my_index, my_uid, written_unique_ids[my_index], nb_written_uid);
352
353 //info() << "Index=" << my_index << " uid=" << my_uid << " n=" << nb_written_uid;
354 indexes[irecv] = my_index;
355 }
356}
357
358/*---------------------------------------------------------------------------*/
359/*---------------------------------------------------------------------------*/
360
361} // End namespace Arcane
362
363/*---------------------------------------------------------------------------*/
364/*---------------------------------------------------------------------------*/
#define ARCANE_FATAL(...)
Macro throwing a FatalErrorException.
Integer size() const
Number of elements in the vector.
Int64 largeSize() const
Number of elements in the vector (in 64 bits).
constexpr Integer size() const noexcept
Number of elements in the array.
Interface of a data item.
Definition IData.h:34
Interface of the parallelism manager for a subdomain.
TraceAccessor(ITraceMng *m)
Constructs an accessor via the trace manager m.
1D data vector with value semantics (STL style).
Ref< IParallelExchanger > createExchangerRef(IParallelMng *pm)
Returns an interface to transfer messages between ranks.
-- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature --
Array< Int64 > Int64Array
Dynamic one-dimensional array of 64-bit integers.
Definition UtilsTypes.h:125
UniqueArray< Int64 > Int64UniqueArray
Dynamic 1D array of 64-bit integers.
Definition UtilsTypes.h:339
std::int64_t Int64
Signed integer type of 64 bits.
Int32 Integer
Type representing an integer.
ConstArrayView< Int32 > Int32ConstArrayView
C equivalent of a 1D array of 32-bit integers.
Definition UtilsTypes.h:482
ConstArrayView< Int64 > Int64ConstArrayView
C equivalent of a 1D array of 64-bit integers.
Definition UtilsTypes.h:480
UniqueArray< Int32 > Int32UniqueArray
Dynamic 1D array of 32-bit integers.
Definition UtilsTypes.h:341
Array< Int32 > Int32Array
Dynamic one-dimensional array of 32-bit integers.
Definition UtilsTypes.h:127
std::int32_t Int32
Signed integer type of 32 bits.