Arcane  v3.16.0.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
ParallelDataReader.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2024 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* ParallelDataReader.cc (C) 2000-2024 */
9/* */
10/* Lecteur de IData en parallèle. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/std/internal/ParallelDataReader.h"
15
16#include "arcane/utils/ScopedPtr.h"
17#include "arcane/utils/FatalErrorException.h"
18#include "arcane/utils/FixedArray.h"
19#include "arcane/utils/CheckedConvert.h"
20
21#include "arcane/core/IParallelMng.h"
22#include "arcane/core/IParallelExchanger.h"
23#include "arcane/core/ISerializer.h"
24#include "arcane/core/ISerializeMessage.h"
25#include "arcane/core/SerializeBuffer.h"
26#include "arcane/core/IData.h"
27#include "arcane/core/parallel/BitonicSortT.H"
28#include "arcane/core/ParallelMngUtils.h"
29
30/*---------------------------------------------------------------------------*/
31/*---------------------------------------------------------------------------*/
32
33namespace Arcane
34{
35
36/*---------------------------------------------------------------------------*/
37/*---------------------------------------------------------------------------*/
56: public TraceAccessor
57{
58 public:
59
60 explicit Impl(IParallelMng* pm);
61
62 public:
63
64 Int64Array& writtenUniqueIds() { return m_written_unique_ids; }
65 Int64Array& wantedUniqueIds() { return m_wanted_unique_ids; }
66
67 private:
68
69 IParallelMng* m_parallel_mng = nullptr;
70
71 Int32UniqueArray m_data_to_send_ranks;
72 //TODO ne pas utiliser un tableau dimensionné au commSize()
73 UniqueArray<SharedArray<Int32>> m_data_to_send_local_indexes;
74 UniqueArray<SharedArray<Int32>> m_data_to_recv_indexes;
75 Int64UniqueArray m_written_unique_ids;
76 Int64UniqueArray m_wanted_unique_ids;
77 Int32UniqueArray m_local_send_indexes;
78
79 public:
80
81 void sort();
82
83 public:
84
85 void getSortedValues(IData* written_data, IData* data);
86
87 private:
88
89 void _searchUniqueIdIndexes(Int64ConstArrayView recv_uids,
90 Int64ConstArrayView written_unique_ids,
91 Int32Array& indexes) const;
92};
93
94/*---------------------------------------------------------------------------*/
95/*---------------------------------------------------------------------------*/
96
97/*---------------------------------------------------------------------------*/
98/*---------------------------------------------------------------------------*/
99
100ParallelDataReader::
101ParallelDataReader(IParallelMng* pm)
102: m_p(new Impl(pm))
103{
104}
105
106ParallelDataReader::
107~ParallelDataReader()
108{
109 delete m_p;
110}
111
112Array<Int64>& ParallelDataReader::
113writtenUniqueIds()
114{
115 return m_p->writtenUniqueIds();
116}
117Array<Int64>& ParallelDataReader::
118wantedUniqueIds()
119{
120 return m_p->wantedUniqueIds();
121}
122void ParallelDataReader::
123sort()
124{
125 return m_p->sort();
126}
127void ParallelDataReader::
128getSortedValues(IData* written_data, IData* data)
129{
130 m_p->getSortedValues(written_data,data);
131}
132
133/*---------------------------------------------------------------------------*/
134/*---------------------------------------------------------------------------*/
135
136ParallelDataReader::Impl::
137Impl(IParallelMng* pm)
138: TraceAccessor(pm->traceMng())
139, m_parallel_mng(pm)
140{
141}
142
143/*---------------------------------------------------------------------------*/
144/*---------------------------------------------------------------------------*/
145
146void ParallelDataReader::Impl::
147sort()
148{
149 Int32 nb_wanted_uid = m_wanted_unique_ids.size();
150
151 Int32 nb_rank = m_parallel_mng->commSize();
152 Int32 my_rank = m_parallel_mng->commRank();
153
154 Int64UniqueArray global_min_max_uid(nb_rank*2);
155 {
156 FixedArray<Int64, 2> min_max_written_uid;
157 min_max_written_uid[0] = NULL_ITEM_UNIQUE_ID;
158 min_max_written_uid[1] = NULL_ITEM_UNIQUE_ID;
159 Integer nb_written_uid = m_written_unique_ids.size();
160
161 if (nb_written_uid!=0){
162 // Les uid écrits sont triés par ordre croissant.
163 // Le plus petit est donc le premier et le plus grand le dernier
164 min_max_written_uid[0] = m_written_unique_ids[0];
165 min_max_written_uid[1] = m_written_unique_ids[nb_written_uid-1];
166 }
167 m_parallel_mng->allGather(min_max_written_uid.view(), global_min_max_uid);
168 }
169 for( Integer irank=0; irank<nb_rank; ++irank )
170 info(5) << "MIN_MAX_UIDS p=" << irank << " min=" << global_min_max_uid[irank*2]
171 << " max=" << global_min_max_uid[(irank*2)+1];
172
173 m_data_to_recv_indexes.resize(nb_rank);
174 {
175 UniqueArray< SharedArray<Int64> > uids_list(nb_rank);
176 auto exchanger { ParallelMngUtils::createExchangerRef(m_parallel_mng) };
177 for( Integer i=0; i<nb_wanted_uid; ++i ){
178 Int64 uid = m_wanted_unique_ids[i];
179 Int32 rank = -1;
180 //TODO: utiliser une dichotomie
181 for( Int32 irank=0; irank<nb_rank; ++irank ){
182 if (uid>=global_min_max_uid[irank*2] && uid<=global_min_max_uid[(irank*2)+1]){
183 rank = irank;
184 break;
185 }
186 }
187 if (rank==(-1))
188 ARCANE_FATAL("Bad rank uid={0} uid_index={1}",uid,i);
189
190 // Il est inutile de s'envoyer les valeurs
191 if (rank!=my_rank){
192 if (uids_list[rank].empty()){
193 exchanger->addSender(rank);
194 }
195 uids_list[rank].add(uid);
196 }
197 m_data_to_recv_indexes[rank].add(i);
198 }
199 exchanger->initializeCommunicationsMessages();
200 //info() << "NB SEND=" << exchanger->nbSender()
201 // << " NB_RECV=" << exchanger->nbReceiver();
202 Int32ConstArrayView senders = exchanger->senderRanks();
203 Integer nb_send = senders.size();
204 for( Integer i=0; i<nb_send; ++i ){
205 //info() << "READ SEND TO A: rank=" << senders[i];
206 ISerializeMessage* send_msg = exchanger->messageToSend(i);
207 Int32 dest_rank = senders[i];
208 ISerializer* serializer = send_msg->serializer();
209 serializer->setMode(ISerializer::ModeReserve);
210 serializer->reserveArray(uids_list[dest_rank]);
211 serializer->allocateBuffer();
212 serializer->setMode(ISerializer::ModePut);
213 serializer->putArray(uids_list[dest_rank]);
214#if 0
215 for( Integer z=0; z<nb_to_send; ++z ){
216 Integer index = indexes_list[dest_rank][z];
217 info() << " SEND Z=" << z << " RANK=" << dest_rank << " index=" << index
218 << " own_index=" << indexes_to_recv[i][z];
219 }
220#endif
221 }
222 exchanger->processExchange();
223 Int32ConstArrayView receivers = exchanger->receiverRanks();
224 Integer nb_recv = receivers.size();
225
226 m_data_to_send_local_indexes.resize(nb_recv);
227 m_data_to_send_ranks.resize(nb_recv);
228
229 for( Integer i=0; i<nb_recv; ++i ){
230 //info() << "READ RECEIVE FROM A: rank=" << receivers[i];
231 ISerializeMessage* recv_msg = exchanger->messageToReceive(i);
232 Int32 orig_rank = receivers[i];
233 m_data_to_send_ranks[i] = orig_rank;
234 ISerializer* serializer = recv_msg->serializer();
235 serializer->setMode(ISerializer::ModeGet);
236 Int64UniqueArray recv_uids;
237 serializer->getArray(recv_uids);
238 Int64 nb_to_recv = recv_uids.largeSize();
239
240 m_data_to_send_local_indexes[i].resize(nb_to_recv);
241 _searchUniqueIdIndexes(recv_uids,m_written_unique_ids,m_data_to_send_local_indexes[i]);
242 }
243 }
244
245 // Traite les données qui sont déjà présentes sur ce processeur.
246 {
247 Int32Array& local_recv_indexes = m_data_to_recv_indexes[my_rank];
248 Integer nb_local_index = local_recv_indexes.size();
249 if (nb_local_index>0){
250 m_local_send_indexes.resize(nb_local_index);
251 Int64UniqueArray uids(nb_local_index);
252 for( Integer i=0; i<nb_local_index; ++i ){
253 uids[i] = m_wanted_unique_ids[local_recv_indexes[i]];
254 }
255 _searchUniqueIdIndexes(uids,m_written_unique_ids,m_local_send_indexes);
256 }
257 }
258}
259
260/*---------------------------------------------------------------------------*/
261/*---------------------------------------------------------------------------*/
262
263void ParallelDataReader::Impl::
264getSortedValues(IData* written_data,IData* data)
265{
266 auto exchanger { ParallelMngUtils::createExchangerRef(m_parallel_mng) };
267 Integer nb_send = m_data_to_send_ranks.size();
268 for( Integer i=0; i<nb_send; ++i ){
269 exchanger->addSender(m_data_to_send_ranks[i]);
270 }
271 //TODO utiliser version sans allGather() car on connait les receptionneurs
272 exchanger->initializeCommunicationsMessages();
273
274 for( Integer i=0; i<nb_send; ++i ){
275 ISerializeMessage* send_msg = exchanger->messageToSend(i);
276 //info() << " SEND TO B: rank=" << send_msg->destSubDomain();
277 //Int32 dest_rank = send_sd[i];
278 ISerializer* serializer = send_msg->serializer();
279 serializer->setMode(ISerializer::ModeReserve);
280 if (written_data)
281 written_data->serialize(serializer,m_data_to_send_local_indexes[i],0);
282 serializer->allocateBuffer();
283 serializer->setMode(ISerializer::ModePut);
284 if (written_data)
285 written_data->serialize(serializer,m_data_to_send_local_indexes[i],0);
286 }
287 exchanger->processExchange();
288
289 Integer nb_wanted_uid = m_wanted_unique_ids.size();
290 data->resize(nb_wanted_uid);
291
292 // Traite les données qui sont déjà présente sur ce processeur.
293 {
294 Integer my_rank = m_parallel_mng->commRank();
295 ConstArrayView<Int32> local_recv_indexes = m_data_to_recv_indexes[my_rank];
296 Integer nb_local_index = local_recv_indexes.size();
297 if (nb_local_index>0){
298 //info() << "SERIALIZE RESERVE";
299 SerializeBuffer sbuf;
300 sbuf.setMode(ISerializer::ModeReserve);
301 if (written_data)
302 written_data->serialize(&sbuf,m_local_send_indexes,0);
303 sbuf.allocateBuffer();
304 //info() << "SERIALIZE PUT";
305 sbuf.setMode(ISerializer::ModePut);
306 if (written_data)
307 written_data->serialize(&sbuf,m_local_send_indexes,0);
308 //info() << "SERIALIZE GET";
309 sbuf.setMode(ISerializer::ModeGet);
310 data->serialize(&sbuf,local_recv_indexes,0);
311 }
312 }
313
314 Int32ConstArrayView receivers = exchanger->receiverRanks();
315 Integer nb_recv = receivers.size();
316 for( Integer i=0; i<nb_recv; ++i ){
317 ISerializeMessage* recv_msg = exchanger->messageToReceive(i);
318 Int32 orig_rank = receivers[i];
319 //info() << " RECEIVE FROM B: rank=" << orig_rank;
320 ISerializer* serializer = recv_msg->serializer();
321 serializer->setMode(ISerializer::ModeGet);
322 data->serialize(serializer,m_data_to_recv_indexes[orig_rank],0);
323 }
324}
325
326/*---------------------------------------------------------------------------*/
327/*---------------------------------------------------------------------------*/
328
329void ParallelDataReader::Impl::
330_searchUniqueIdIndexes(Int64ConstArrayView recv_uids,
331 Int64ConstArrayView written_unique_ids,
332 Array<Int32>& indexes) const
333{
334 Integer nb_to_recv = recv_uids.size();
335 Integer nb_written_uid = written_unique_ids.size();
336
337 for( Integer irecv=0; irecv<nb_to_recv; ++irecv ){
338 Int64 my_uid = recv_uids[irecv];
339 // Comme les written_unique_ids sont triés, on peut utiliser une dichotomie
340 auto iter_end = written_unique_ids.end();
341 auto iter_begin = written_unique_ids.begin();
342 auto x2 = std::lower_bound(iter_begin, iter_end, my_uid);
343 if (x2 == iter_end)
344 ARCANE_FATAL("Can not find uid uid={0} (with binary_search)", my_uid);
345 Int32 my_index = CheckedConvert::toInt32(x2 - iter_begin);
346
347 // Teste si la dichotomie est correcte
348 if (written_unique_ids[my_index]!=my_uid)
349 ARCANE_FATAL("INTERNAL: bad index for bissection "
350 "Index={0} uid={1} wuid={2} n={3}",
351 my_index,my_uid,written_unique_ids[my_index],nb_written_uid);
352
353
354 //info() << "Index=" << my_index << " uid=" << my_uid << " n=" << nb_written_uid;
355 indexes[irecv] = my_index;
356 }
357}
358
359/*---------------------------------------------------------------------------*/
360/*---------------------------------------------------------------------------*/
361
362} // End namespace Arcane
363
364/*---------------------------------------------------------------------------*/
365/*---------------------------------------------------------------------------*/
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Integer size() const
Nombre d'éléments du vecteur.
Int64 largeSize() const
Nombre d'éléments du vecteur (en 64 bits)
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
Interface d'une donnée.
Definition IData.h:33
Interface du gestionnaire de parallélisme pour un sous-domaine.
@ ModePut
Le sérialiseur attend des reserve()
TraceAccessor(ITraceMng *m)
Construit un accesseur via le gestionnaire de trace m.
Vecteur 1D de données avec sémantique par valeur (style STL).
Int32 toInt32(Int64 v)
Converti un Int64 en un Int32.
Ref< IParallelExchanger > createExchangerRef(IParallelMng *pm)
Retourne une interface pour transférer des messages entre rangs.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
Array< Int64 > Int64Array
Tableau dynamique à une dimension d'entiers 64 bits.
Definition UtilsTypes.h:212
UniqueArray< Int64 > Int64UniqueArray
Tableau dynamique à une dimension d'entiers 64 bits.
Definition UtilsTypes.h:426
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:569
ConstArrayView< Int64 > Int64ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 64 bits.
Definition UtilsTypes.h:567
UniqueArray< Int32 > Int32UniqueArray
Tableau dynamique à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:428
Array< Int32 > Int32Array
Tableau dynamique à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:214
std::int32_t Int32
Type entier signé sur 32 bits.