Arcane  v3.14.10.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
ParallelDataReader.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2024 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* ParallelDataReader.cc (C) 2000-2024 */
9/* */
10/* Lecteur de IData en parallèle. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/std/internal/ParallelDataReader.h"
15
16#include "arcane/utils/ScopedPtr.h"
17#include "arcane/utils/FatalErrorException.h"
18#include "arcane/utils/FixedArray.h"
19#include "arcane/utils/CheckedConvert.h"
20
21#include "arcane/core/IParallelMng.h"
22#include "arcane/core/IParallelExchanger.h"
23#include "arcane/core/ISerializer.h"
24#include "arcane/core/ISerializeMessage.h"
25#include "arcane/core/SerializeBuffer.h"
26#include "arcane/core/IData.h"
27#include "arcane/core/parallel/BitonicSortT.H"
28#include "arcane/core/ParallelMngUtils.h"
29
30/*---------------------------------------------------------------------------*/
31/*---------------------------------------------------------------------------*/
32
33namespace Arcane
34{
35
36/*---------------------------------------------------------------------------*/
37/*---------------------------------------------------------------------------*/
56: public TraceAccessor
57{
58 public:
59
60 explicit Impl(IParallelMng* pm);
61
62 public:
63
64 Int64Array& writtenUniqueIds() { return m_written_unique_ids; }
65 Int64Array& wantedUniqueIds() { return m_wanted_unique_ids; }
66
67 private:
68
69 IParallelMng* m_parallel_mng = nullptr;
70
71 Int32UniqueArray m_data_to_send_ranks;
72 //TODO ne pas utiliser un tableau dimensionné au commSize()
73 UniqueArray<SharedArray<Int32>> m_data_to_send_local_indexes;
74 UniqueArray<SharedArray<Int32>> m_data_to_recv_indexes;
75 Int64UniqueArray m_written_unique_ids;
76 Int64UniqueArray m_wanted_unique_ids;
77 Int32UniqueArray m_local_send_indexes;
78
79 public:
80
81 void sort();
82
83 public:
84
85 void getSortedValues(IData* written_data, IData* data);
86
87 private:
88
89 void _searchUniqueIdIndexes(Int64ConstArrayView recv_uids,
91 Int32Array& indexes) const;
92};
93
94/*---------------------------------------------------------------------------*/
95/*---------------------------------------------------------------------------*/
96
97/*---------------------------------------------------------------------------*/
98/*---------------------------------------------------------------------------*/
99
100ParallelDataReader::
101ParallelDataReader(IParallelMng* pm)
102: m_p(new Impl(pm))
103{
104}
105
106ParallelDataReader::
107~ParallelDataReader()
108{
109 delete m_p;
110}
111
112Array<Int64>& ParallelDataReader::
113writtenUniqueIds()
114{
115 return m_p->writtenUniqueIds();
116}
117Array<Int64>& ParallelDataReader::
118wantedUniqueIds()
119{
120 return m_p->wantedUniqueIds();
121}
122void ParallelDataReader::
123sort()
124{
125 return m_p->sort();
126}
127void ParallelDataReader::
128getSortedValues(IData* written_data, IData* data)
129{
130 m_p->getSortedValues(written_data,data);
131}
132
133/*---------------------------------------------------------------------------*/
134/*---------------------------------------------------------------------------*/
135
136ParallelDataReader::Impl::
137Impl(IParallelMng* pm)
138: TraceAccessor(pm->traceMng())
139, m_parallel_mng(pm)
140{
141}
142
143/*---------------------------------------------------------------------------*/
144/*---------------------------------------------------------------------------*/
145
146void ParallelDataReader::Impl::
147sort()
148{
149 Int32 nb_wanted_uid = m_wanted_unique_ids.size();
150
151 Int32 nb_rank = m_parallel_mng->commSize();
152 Int32 my_rank = m_parallel_mng->commRank();
153
154 Int64UniqueArray global_min_max_uid(nb_rank*2);
155 {
156 FixedArray<Int64, 2> min_max_written_uid;
157 min_max_written_uid[0] = NULL_ITEM_UNIQUE_ID;
158 min_max_written_uid[1] = NULL_ITEM_UNIQUE_ID;
159 Integer nb_written_uid = m_written_unique_ids.size();
160
161 if (nb_written_uid!=0){
162 // Les uid écrits sont triés par ordre croissant.
163 // Le plus petit est donc le premier et le plus grand le dernier
164 min_max_written_uid[0] = m_written_unique_ids[0];
165 min_max_written_uid[1] = m_written_unique_ids[nb_written_uid-1];
166 }
167 m_parallel_mng->allGather(min_max_written_uid.view(), global_min_max_uid);
168 }
169 for( Integer irank=0; irank<nb_rank; ++irank )
170 info(5) << "MIN_MAX_UIDS p=" << irank << " min=" << global_min_max_uid[irank*2]
171 << " max=" << global_min_max_uid[(irank*2)+1];
172
173 m_data_to_recv_indexes.resize(nb_rank);
174 {
175 UniqueArray< SharedArray<Int64> > uids_list(nb_rank);
176 auto exchanger { ParallelMngUtils::createExchangerRef(m_parallel_mng) };
177 for( Integer i=0; i<nb_wanted_uid; ++i ){
178 Int64 uid = m_wanted_unique_ids[i];
179 Int32 rank = -1;
180 //TODO: utiliser une dichotomie
181 for( Int32 irank=0; irank<nb_rank; ++irank ){
182 if (uid>=global_min_max_uid[irank*2] && uid<=global_min_max_uid[(irank*2)+1]){
183 rank = irank;
184 break;
185 }
186 }
187 if (rank==(-1))
188 ARCANE_FATAL("Bad rank uid={0} uid_index={1}",uid,i);
189
190 // Il est inutile de s'envoyer les valeurs
191 if (rank!=my_rank){
192 if (uids_list[rank].empty()){
193 exchanger->addSender(rank);
194 }
195 uids_list[rank].add(uid);
196 }
197 m_data_to_recv_indexes[rank].add(i);
198 }
199 exchanger->initializeCommunicationsMessages();
200 //info() << "NB SEND=" << exchanger->nbSender()
201 // << " NB_RECV=" << exchanger->nbReceiver();
202 Int32ConstArrayView senders = exchanger->senderRanks();
203 Integer nb_send = senders.size();
204 for( Integer i=0; i<nb_send; ++i ){
205 //info() << "READ SEND TO A: rank=" << senders[i];
206 ISerializeMessage* send_msg = exchanger->messageToSend(i);
207 Int32 dest_rank = senders[i];
208 ISerializer* serializer = send_msg->serializer();
209 serializer->setMode(ISerializer::ModeReserve);
210 serializer->reserveArray(uids_list[dest_rank]);
211 serializer->allocateBuffer();
212 serializer->setMode(ISerializer::ModePut);
213 serializer->putArray(uids_list[dest_rank]);
214#if 0
215 for( Integer z=0; z<nb_to_send; ++z ){
216 Integer index = indexes_list[dest_rank][z];
217 info() << " SEND Z=" << z << " RANK=" << dest_rank << " index=" << index
218 << " own_index=" << indexes_to_recv[i][z];
219 }
220#endif
221 }
222 exchanger->processExchange();
223 Int32ConstArrayView receivers = exchanger->receiverRanks();
224 Integer nb_recv = receivers.size();
225
226 m_data_to_send_local_indexes.resize(nb_recv);
227 m_data_to_send_ranks.resize(nb_recv);
228
229 for( Integer i=0; i<nb_recv; ++i ){
230 //info() << "READ RECEIVE FROM A: rank=" << receivers[i];
231 ISerializeMessage* recv_msg = exchanger->messageToReceive(i);
232 Int32 orig_rank = receivers[i];
233 m_data_to_send_ranks[i] = orig_rank;
234 ISerializer* serializer = recv_msg->serializer();
235 serializer->setMode(ISerializer::ModeGet);
236 Int64UniqueArray recv_uids;
237 serializer->getArray(recv_uids);
238 Int64 nb_to_recv = recv_uids.largeSize();
239
240 m_data_to_send_local_indexes[i].resize(nb_to_recv);
241 _searchUniqueIdIndexes(recv_uids,m_written_unique_ids,m_data_to_send_local_indexes[i]);
242 }
243 }
244
245 // Traite les données qui sont déjà présentes sur ce processeur.
246 {
247 Int32Array& local_recv_indexes = m_data_to_recv_indexes[my_rank];
248 Integer nb_local_index = local_recv_indexes.size();
249 if (nb_local_index>0){
250 m_local_send_indexes.resize(nb_local_index);
251 Int64UniqueArray uids(nb_local_index);
252 for( Integer i=0; i<nb_local_index; ++i ){
253 uids[i] = m_wanted_unique_ids[local_recv_indexes[i]];
254 }
255 _searchUniqueIdIndexes(uids,m_written_unique_ids,m_local_send_indexes);
256 }
257 }
258}
259
260/*---------------------------------------------------------------------------*/
261/*---------------------------------------------------------------------------*/
262
263void ParallelDataReader::Impl::
264getSortedValues(IData* written_data,IData* data)
265{
266 auto exchanger { ParallelMngUtils::createExchangerRef(m_parallel_mng) };
267 Integer nb_send = m_data_to_send_ranks.size();
268 for( Integer i=0; i<nb_send; ++i ){
269 exchanger->addSender(m_data_to_send_ranks[i]);
270 }
271 //TODO utiliser version sans allGather() car on connait les receptionneurs
272 exchanger->initializeCommunicationsMessages();
273
274 for( Integer i=0; i<nb_send; ++i ){
275 ISerializeMessage* send_msg = exchanger->messageToSend(i);
276 //info() << " SEND TO B: rank=" << send_msg->destSubDomain();
277 //Int32 dest_rank = send_sd[i];
278 ISerializer* serializer = send_msg->serializer();
279 serializer->setMode(ISerializer::ModeReserve);
280 if (written_data)
281 written_data->serialize(serializer,m_data_to_send_local_indexes[i],0);
282 serializer->allocateBuffer();
283 serializer->setMode(ISerializer::ModePut);
284 if (written_data)
285 written_data->serialize(serializer,m_data_to_send_local_indexes[i],0);
286 }
287 exchanger->processExchange();
288
289 Integer nb_wanted_uid = m_wanted_unique_ids.size();
290 data->resize(nb_wanted_uid);
291
292 // Traite les données qui sont déjà présente sur ce processeur.
293 {
294 Integer my_rank = m_parallel_mng->commRank();
295 ConstArrayView<Int32> local_recv_indexes = m_data_to_recv_indexes[my_rank];
296 Integer nb_local_index = local_recv_indexes.size();
297 if (nb_local_index>0){
298 //info() << "SERIALIZE RESERVE";
299 SerializeBuffer sbuf;
300 sbuf.setMode(ISerializer::ModeReserve);
301 if (written_data)
302 written_data->serialize(&sbuf,m_local_send_indexes,0);
303 sbuf.allocateBuffer();
304 //info() << "SERIALIZE PUT";
305 sbuf.setMode(ISerializer::ModePut);
306 if (written_data)
307 written_data->serialize(&sbuf,m_local_send_indexes,0);
308 //info() << "SERIALIZE GET";
309 sbuf.setMode(ISerializer::ModeGet);
310 data->serialize(&sbuf,local_recv_indexes,0);
311 }
312 }
313
314 Int32ConstArrayView receivers = exchanger->receiverRanks();
315 Integer nb_recv = receivers.size();
316 for( Integer i=0; i<nb_recv; ++i ){
317 ISerializeMessage* recv_msg = exchanger->messageToReceive(i);
318 Int32 orig_rank = receivers[i];
319 //info() << " RECEIVE FROM B: rank=" << orig_rank;
320 ISerializer* serializer = recv_msg->serializer();
321 serializer->setMode(ISerializer::ModeGet);
322 data->serialize(serializer,m_data_to_recv_indexes[orig_rank],0);
323 }
324}
325
326/*---------------------------------------------------------------------------*/
327/*---------------------------------------------------------------------------*/
328
329void ParallelDataReader::Impl::
330_searchUniqueIdIndexes(Int64ConstArrayView recv_uids,
331 Int64ConstArrayView written_unique_ids,
332 Array<Int32>& indexes) const
333{
334 Integer nb_to_recv = recv_uids.size();
335 Integer nb_written_uid = written_unique_ids.size();
336
337 for( Integer irecv=0; irecv<nb_to_recv; ++irecv ){
338 Int64 my_uid = recv_uids[irecv];
339 // Comme les written_unique_ids sont triés, on peut utiliser une dichotomie
340 auto iter_end = written_unique_ids.end();
341 auto iter_begin = written_unique_ids.begin();
342 auto x2 = std::lower_bound(iter_begin, iter_end, my_uid);
343 if (x2 == iter_end)
344 ARCANE_FATAL("Can not find uid uid={0} (with binary_search)", my_uid);
345 Int32 my_index = CheckedConvert::toInt32(x2 - iter_begin);
346
347 // Teste si la dichotomie est correcte
348 if (written_unique_ids[my_index]!=my_uid)
349 ARCANE_FATAL("INTERNAL: bad index for bissection "
350 "Index={0} uid={1} wuid={2} n={3}",
351 my_index,my_uid,written_unique_ids[my_index],nb_written_uid);
352
353
354 //info() << "Index=" << my_index << " uid=" << my_uid << " n=" << nb_written_uid;
355 indexes[irecv] = my_index;
356 }
357}
358
359/*---------------------------------------------------------------------------*/
360/*---------------------------------------------------------------------------*/
361
362} // End namespace Arcane
363
364/*---------------------------------------------------------------------------*/
365/*---------------------------------------------------------------------------*/
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Tableau d'items de types quelconques.
Interface d'une donnée.
Definition IData.h:33
Interface du gestionnaire de parallélisme pour un sous-domaine.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Definition Lima.cc:120
Vue constante d'un tableau de type T.
Vecteur 1D de données avec sémantique par valeur (style STL).
Int32 toInt32(Int64 v)
Converti un Int64 en un Int32.
Ref< IParallelExchanger > createExchangerRef(IParallelMng *pm)
Retourne une interface pour transférer des messages entre rangs.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
UniqueArray< Int64 > Int64UniqueArray
Tableau dynamique à une dimension d'entiers 64 bits.
Definition UtilsTypes.h:513
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:640
Array< Int32 > Int32Array
Tableau dynamique à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:327
ConstArrayView< Int64 > Int64ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 64 bits.
Definition UtilsTypes.h:638
Int32 Integer
Type représentant un entier.