Arcane  v3.15.0.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
ParallelDataWriter.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2024 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* ParallelDataWriter.cc (C) 2000-2024 */
9/* */
10/* Lecteur/Ecrivain de IData en parallèle. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/std/internal/ParallelDataWriter.h"
15
16#include "arcane/utils/Ref.h"
17
18#include "arcane/core/IParallelMng.h"
19#include "arcane/core/IParallelExchanger.h"
20#include "arcane/core/ISerializer.h"
21#include "arcane/core/ISerializeMessage.h"
22#include "arcane/core/SerializeBuffer.h"
23#include "arcane/core/IData.h"
24#include "arcane/core/parallel/BitonicSortT.H"
25#include "arcane/core/ParallelMngUtils.h"
26#include "arcane/core/ItemGroup.h"
27#include "arcane/core/IItemFamily.h"
29
30/*---------------------------------------------------------------------------*/
31/*---------------------------------------------------------------------------*/
32
33namespace Arcane
34{
35
36/*---------------------------------------------------------------------------*/
37/*---------------------------------------------------------------------------*/
38
40: public TraceAccessor
41{
42 public:
43
44 explicit Impl(IParallelMng* pm);
45
46 public:
47
48 Int64ConstArrayView sortedUniqueIds() const;
49 void setGatherAll(bool v);
50
51 private:
52
53 IParallelMng* m_parallel_mng = nullptr;
58 //TODO ne pas utiliser un tableau dimensionné au commSize()
59 UniqueArray<UniqueArray<Int32>> m_indexes_to_send;
60 UniqueArray<UniqueArray<Int32>> m_indexes_to_recv;
61 Int32 m_nb_item = 0;
62 Int64UniqueArray m_sorted_unique_ids;
63
64 UniqueArray<Int32> m_local_indexes_to_send;
65 UniqueArray<Int32> m_local_indexes_to_recv;
66
67 bool m_gather_all = false;
68 bool m_is_verbose = false;
69
70 public:
71
73
74 Ref<IData> getSortedValues(IData* data);
75};
76
77/*---------------------------------------------------------------------------*/
78/*---------------------------------------------------------------------------*/
79
80ParallelDataWriter::
81ParallelDataWriter(IParallelMng* pm)
82: m_p(new Impl(pm))
83{
84}
85
86ParallelDataWriter::
87~ParallelDataWriter()
88{
89 delete m_p;
90}
91
92Int64ConstArrayView ParallelDataWriter::
93sortedUniqueIds() const
94{
95 return m_p->sortedUniqueIds();
96}
97void ParallelDataWriter::
98setGatherAll(bool v)
99{
100 m_p->setGatherAll(v);
101}
102
103void ParallelDataWriter::
104sort(Int32ConstArrayView local_ids,Int64ConstArrayView items_uid)
105{
106 m_p->sort(local_ids,items_uid);
107}
108
109Ref<IData> ParallelDataWriter::
110getSortedValues(IData* data)
111{
112 return m_p->getSortedValues(data);
113}
114
115/*---------------------------------------------------------------------------*/
116/*---------------------------------------------------------------------------*/
117
118ParallelDataWriter::Impl::
119Impl(IParallelMng* pm)
120: TraceAccessor(pm->traceMng())
121, m_parallel_mng(pm)
122{
123}
124
125/*---------------------------------------------------------------------------*/
126/*---------------------------------------------------------------------------*/
127
128Int64ConstArrayView ParallelDataWriter::Impl::
129sortedUniqueIds() const
130{
131 return m_sorted_unique_ids;
132}
133
134/*---------------------------------------------------------------------------*/
135/*---------------------------------------------------------------------------*/
136
137void ParallelDataWriter::Impl::
138setGatherAll(bool v)
139{
140 m_gather_all = v;
141}
142
143/*---------------------------------------------------------------------------*/
144/*---------------------------------------------------------------------------*/
145
146void ParallelDataWriter::Impl::
147sort(Int32ConstArrayView local_ids,Int64ConstArrayView items_uid)
148{
149 IParallelMng* pm = m_parallel_mng;
150
151 Parallel::BitonicSort<Int64> uid_sorter(pm);
152 uid_sorter.sort(items_uid);
153
154 ConstArrayView<Int32> key_indexes = uid_sorter.keyIndexes();
155 ConstArrayView<Int32> key_ranks = uid_sorter.keyRanks();
156 ConstArrayView<Int64> keys = uid_sorter.keys();
157
158 UniqueArray<Int64> global_all_keys;
159 UniqueArray<Int32> global_all_key_indexes;
160 UniqueArray<Int32> global_all_key_ranks;
161
162 Int32 nb_item = keys.size();
163 const Int32 my_rank = pm->commRank();
164 const bool is_verbose = m_is_verbose;
165 if (is_verbose) {
166 for (Integer i = 0; i < math::min(nb_item, 20); ++i) {
167 info() << "ORIGINAL I=" << i << " UID=" << items_uid[i]
168 << " INDEX=" << key_indexes[i]
169 << " RANK=" << key_ranks[i]
170 << " KEY=" << keys[i];
171 }
172 }
173
174 if (m_gather_all){
175 // Le proc 0 récupère tout
176 pm->allGatherVariable(keys,global_all_keys);
177 pm->allGatherVariable(key_indexes,global_all_key_indexes);
178 pm->allGatherVariable(key_ranks,global_all_key_ranks);
179 Int32 gather_rank = 0;
180
181 if (pm->commRank()!=gather_rank){
182 global_all_key_ranks.clear();
183 global_all_key_indexes.clear();
184 global_all_keys.clear();
185 }
186 nb_item = global_all_keys.size();
187
188 key_ranks = global_all_key_ranks.view();
189 key_indexes = global_all_key_indexes.view();
190 keys = global_all_keys.view();
191 }
192
193 m_nb_item = nb_item;
194
195 m_sorted_unique_ids.resize(nb_item);
196 m_sorted_unique_ids.copy(keys);
197
198 //info() << "END SORT SIZE=" << nb_item << " KEY_SIZE=" << keys.size();
199 if (is_verbose) {
200 for (Integer i = 0; i < math::min(nb_item, 20); ++i) {
201 info() << "I=" << i << " KEY=" << keys[i]
202 << " INDEX=" << key_indexes[i]
203 << " RANK=" << key_ranks[i];
204 }
205 }
206 {
207 UniqueArray<UniqueArray<Int32>> indexes_list(pm->commSize());
208 UniqueArray<UniqueArray<Int32>> own_indexes_list(pm->commSize());
209 auto sd_exchange { ParallelMngUtils::createExchangerRef(pm) };
210
211 for( Integer i=0; i<nb_item; ++i ){
212 Int32 index = key_indexes[i];
213 Int32 rank = key_ranks[i];
214 if (rank != my_rank && indexes_list[rank].empty())
215 sd_exchange->addSender(rank);
216 indexes_list[rank].add(index);
217 own_indexes_list[rank].add(i);
218 }
219 m_local_indexes_to_recv = own_indexes_list[my_rank];
220 m_local_indexes_to_send.resize(indexes_list[my_rank].size());
221
222 sd_exchange->initializeCommunicationsMessages();
223 //info() << "NB SEND=" << sd_exchange->nbSender()
224 // << " NB_RECV=" << sd_exchange->nbReceiver();
225 Int32ConstArrayView send_sd = sd_exchange->senderRanks();
226 Integer nb_send = send_sd.size();
227 m_indexes_to_recv.resize(nb_send);
228 m_ranks_to_recv.resize(nb_send);
229 for( Integer i=0; i<nb_send; ++i ){
230 //info() << " SEND TO A: rank=" << send_sd[i];
231 ISerializeMessage* send_msg = sd_exchange->messageToSend(i);
232 Int32 dest_rank = send_sd[i];
233 ISerializer* serializer = send_msg->serializer();
234 m_indexes_to_recv[i] = own_indexes_list[dest_rank];
235 m_ranks_to_recv[i] = dest_rank;
236 serializer->setMode(ISerializer::ModeReserve);
237
238 serializer->reserveArray(indexes_list[dest_rank]);
239
240 serializer->allocateBuffer();
241 serializer->setMode(ISerializer::ModePut);
242
243 serializer->putArray(indexes_list[dest_rank]);
244 if (is_verbose) {
245 Integer nb_to_send = indexes_list[dest_rank].size();
246 for (Integer z = 0; z < nb_to_send; ++z) {
247 Integer index = indexes_list[dest_rank][z];
248 info() << " SEND Z=" << z << " RANK=" << dest_rank << " index=" << index;
249 }
250 }
251 }
252 sd_exchange->processExchange();
253
254 ConstArrayView<Int32> recv_sd = sd_exchange->receiverRanks();
255 const Int32 nb_recv = recv_sd.size();
256 m_indexes_to_send.resize(nb_recv);
257 m_ranks_to_send.resize(nb_recv);
258 for( Integer i=0; i<nb_recv; ++i ){
259 //info() << " RECEIVE FROM A: rank=" << recv_sd[i];
260 ISerializeMessage* recv_msg = sd_exchange->messageToReceive(i);
261 Int32 orig_rank = recv_sd[i];
262 ISerializer* serializer = recv_msg->serializer();
263 serializer->setMode(ISerializer::ModeGet);
264 Int32Array& recv_indexes = m_indexes_to_send[i];
265 m_ranks_to_send[i] = orig_rank;
266 serializer->getArray(recv_indexes);
267 const Int32 nb_to_recv = recv_indexes.size();
268 //info() << " RECEIVE FROM A: NB_TO_RECEIVE " << nb_to_recv << " S2=" << own_group_local_ids.size();
269 for( Integer z=0; z<nb_to_recv; ++z ){
270 Int32 index = recv_indexes[z];
271 //info() << " RECV Z=" << z << " RANK=" << orig_rank << " index=" << index
272 // << " index2=" << own_group_local_ids[index];
273 recv_indexes[z] = local_ids[index];
274 }
275 //info() << "END RECEIVE FROM A: NB_TO_RECEIVE " << nb_to_recv;
276 }
277
278 // Traite les entités locales
279 {
280 const Int32 nb_local = m_local_indexes_to_send.size();
281 for (Int32 z = 0; z < nb_local; ++z) {
282 Int32 index = indexes_list[my_rank][z];
283 //info() << " RECV Z=" << z << " RANK=" << orig_rank << " index=" << index
284 // << " index2=" << own_group_local_ids[index];
285 m_local_indexes_to_send[z] = local_ids[index];
286 }
287 }
288
289 }
290}
291
292/*---------------------------------------------------------------------------*/
293/*---------------------------------------------------------------------------*/
294
295Ref<IData> ParallelDataWriter::Impl::
296getSortedValues(IData* data)
297{
298 IParallelMng* pm = m_parallel_mng;
299 Ref<IData> sorted_data = data->cloneEmptyRef();
300
301 auto sd_exchange { ParallelMngUtils::createExchangerRef(pm) };
302 for (Int32 rank_to_send : m_ranks_to_send)
303 sd_exchange->addSender(rank_to_send);
304
305 UniqueArray<Int32> ranks_to_recv2;
306 for (Int32 rank_to_receive : m_ranks_to_recv)
307 ranks_to_recv2.add(rank_to_receive);
308
309 sd_exchange->initializeCommunicationsMessages(ranks_to_recv2);
310 Int32ConstArrayView send_sd = sd_exchange->senderRanks();
311 const Int32 nb_send = send_sd.size();
312 for( Integer i=0; i<nb_send; ++i ){
313 //info() << " SEND TO B: rank=" << send_sd[i];
314 ISerializeMessage* send_msg = sd_exchange->messageToSend(i);
315 //Int32 dest_rank = send_sd[i];
316 ISerializer* serializer = send_msg->serializer();
317 serializer->setMode(ISerializer::ModeReserve);
318 data->serialize(serializer,m_indexes_to_send[i],0);
319 serializer->allocateBuffer();
320 serializer->setMode(ISerializer::ModePut);
321 data->serialize(serializer,m_indexes_to_send[i],0);
322 }
323
324 sd_exchange->processExchange();
325
326 Int32ConstArrayView recv_sd = sd_exchange->receiverRanks();
327 const Int32 nb_recv = recv_sd.size();
328 sorted_data->resize(m_nb_item);
329 for( Integer i=0; i<nb_recv; ++i ){
330 //info() << " RECEIVE FROM B: rank=" << recv_sd[i];
331 ISerializeMessage* recv_msg = sd_exchange->messageToReceive(i);
332 //Int32 orig_rank = recv_sd[i];
333 ISerializer* serializer = recv_msg->serializer();
334 serializer->setMode(ISerializer::ModeGet);
335 sorted_data->serialize(serializer,m_indexes_to_recv[i],0);
336 }
337
338 // Traite les données qui sont déjà présentes sur ce processeur.
339 {
340 ConstArrayView<Int32> local_recv_indexes = m_local_indexes_to_recv;
341 const Int32 nb_local_index = local_recv_indexes.size();
342 if (nb_local_index>0){
343 SerializeBuffer sbuf;
344 sbuf.setMode(ISerializer::ModeReserve);
345 data->serialize(&sbuf, m_local_indexes_to_send, nullptr);
346 sbuf.allocateBuffer();
347 sbuf.setMode(ISerializer::ModePut);
348 data->serialize(&sbuf, m_local_indexes_to_send, nullptr);
349 sbuf.setMode(ISerializer::ModeGet);
350 sorted_data->serialize(&sbuf, local_recv_indexes, nullptr);
351 }
352 }
353 return sorted_data;
354}
355
356/*---------------------------------------------------------------------------*/
357/*---------------------------------------------------------------------------*/
358
359/*---------------------------------------------------------------------------*/
360/*---------------------------------------------------------------------------*/
361
362Ref<ParallelDataWriter> ParallelDataWriterList::
363getOrCreateWriter(const ItemGroup& group)
364{
365 auto i = m_data_writers.find(group);
366 if (i != m_data_writers.end())
367 return i->second;
368 IParallelMng* pm = group.itemFamily()->parallelMng();
369 Ref<ParallelDataWriter> writer = makeRef(new ParallelDataWriter(pm));
370 {
371 Int64UniqueArray items_uid;
372 ItemGroup own_group = group.own();
373 MeshUtils::fillUniqueIds(own_group.view(), items_uid);
374 Int32ConstArrayView local_ids = own_group.internal()->itemsLocalId();
375 writer->sort(local_ids, items_uid);
376 }
377 m_data_writers.try_emplace(group, writer);
378 return writer;
379}
380
381/*---------------------------------------------------------------------------*/
382/*---------------------------------------------------------------------------*/
383
384} // End namespace Arcane
385
386/*---------------------------------------------------------------------------*/
387/*---------------------------------------------------------------------------*/
Fonctions utilitaires sur le maillage.
Interface d'une donnée.
Definition IData.h:33
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual Int32 commRank() const =0
Rang de cette instance dans le communicateur.
virtual void allGatherVariable(ConstArrayView< char > send_buf, Array< char > &recv_buf)=0
Effectue un regroupement sur tous les processeurs.
virtual Int32 commSize() const =0
Nombre d'instance dans le communicateur.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Definition Lima.cc:149
UniqueArray< Int32 > m_ranks_to_recv
Tableau indiquant les rangs des process auxquels on envoie des infos.
UniqueArray< Int32 > m_ranks_to_send
Tableau indiquant les rangs des process dont on recoit des infos.
Vue constante d'un tableau de type T.
Vecteur 1D de données avec sémantique par valeur (style STL).
ARCCORE_HOST_DEVICE Real2 min(Real2 a, Real2 b)
Retourne le minimum de deux Real2.
Definition MathUtils.h:336
Ref< IParallelExchanger > createExchangerRef(IParallelMng *pm)
Retourne une interface pour transférer des messages entre rangs.
void add(ArrayView< T > lhs, ConstArrayView< T > copy_array)
Ajoute le tableau copy_array dans l'instance.
Definition MathUtils.h:885
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
UniqueArray< Int64 > Int64UniqueArray
Tableau dynamique à une dimension d'entiers 64 bits.
Definition UtilsTypes.h:550
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:693
Array< Int32 > Int32Array
Tableau dynamique à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:338
ConstArrayView< Int64 > Int64ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 64 bits.
Definition UtilsTypes.h:691
Int32 Integer
Type représentant un entier.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.