Arcane  v3.16.0.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
ParallelDataWriter.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* ParallelDataWriter.cc (C) 2000-2025 */
9/* */
10/* Lecteur/Ecrivain de IData en parallèle. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/std/internal/ParallelDataWriter.h"
15
16#include "arcane/utils/Ref.h"
17#include "arcane/utils/Math.h"
18
19#include "arcane/core/IParallelMng.h"
20#include "arcane/core/IParallelExchanger.h"
21#include "arcane/core/ISerializer.h"
22#include "arcane/core/ISerializeMessage.h"
23#include "arcane/core/SerializeBuffer.h"
24#include "arcane/core/IData.h"
25#include "arcane/core/parallel/BitonicSortT.H"
26#include "arcane/core/ParallelMngUtils.h"
27#include "arcane/core/ItemGroup.h"
28#include "arcane/core/IItemFamily.h"
30
31/*---------------------------------------------------------------------------*/
32/*---------------------------------------------------------------------------*/
33
34namespace Arcane
35{
36
37/*---------------------------------------------------------------------------*/
38/*---------------------------------------------------------------------------*/
39
41: public TraceAccessor
42{
43 public:
44
45 explicit Impl(IParallelMng* pm);
46
47 public:
48
49 Int64ConstArrayView sortedUniqueIds() const;
50 void setGatherAll(bool v);
51
52 private:
53
54 IParallelMng* m_parallel_mng = nullptr;
59 //TODO ne pas utiliser un tableau dimensionné au commSize()
60 UniqueArray<UniqueArray<Int32>> m_indexes_to_send;
61 UniqueArray<UniqueArray<Int32>> m_indexes_to_recv;
62 Int32 m_nb_item = 0;
63 Int64UniqueArray m_sorted_unique_ids;
64
65 UniqueArray<Int32> m_local_indexes_to_send;
66 UniqueArray<Int32> m_local_indexes_to_recv;
67
68 bool m_gather_all = false;
69 bool m_is_verbose = false;
70
71 public:
72
73 void sort(Int32ConstArrayView local_ids,Int64ConstArrayView items_uid);
74
75 Ref<IData> getSortedValues(IData* data);
76};
77
78/*---------------------------------------------------------------------------*/
79/*---------------------------------------------------------------------------*/
80
81ParallelDataWriter::
82ParallelDataWriter(IParallelMng* pm)
83: m_p(new Impl(pm))
84{
85}
86
87ParallelDataWriter::
88~ParallelDataWriter()
89{
90 delete m_p;
91}
92
93Int64ConstArrayView ParallelDataWriter::
94sortedUniqueIds() const
95{
96 return m_p->sortedUniqueIds();
97}
98void ParallelDataWriter::
99setGatherAll(bool v)
100{
101 m_p->setGatherAll(v);
102}
103
104void ParallelDataWriter::
105sort(Int32ConstArrayView local_ids,Int64ConstArrayView items_uid)
106{
107 m_p->sort(local_ids,items_uid);
108}
109
110Ref<IData> ParallelDataWriter::
111getSortedValues(IData* data)
112{
113 return m_p->getSortedValues(data);
114}
115
116/*---------------------------------------------------------------------------*/
117/*---------------------------------------------------------------------------*/
118
119ParallelDataWriter::Impl::
120Impl(IParallelMng* pm)
121: TraceAccessor(pm->traceMng())
122, m_parallel_mng(pm)
123{
124}
125
126/*---------------------------------------------------------------------------*/
127/*---------------------------------------------------------------------------*/
128
129Int64ConstArrayView ParallelDataWriter::Impl::
130sortedUniqueIds() const
131{
132 return m_sorted_unique_ids;
133}
134
135/*---------------------------------------------------------------------------*/
136/*---------------------------------------------------------------------------*/
137
138void ParallelDataWriter::Impl::
139setGatherAll(bool v)
140{
141 m_gather_all = v;
142}
143
144/*---------------------------------------------------------------------------*/
145/*---------------------------------------------------------------------------*/
146
147void ParallelDataWriter::Impl::
148sort(Int32ConstArrayView local_ids,Int64ConstArrayView items_uid)
149{
150 IParallelMng* pm = m_parallel_mng;
151
152 Parallel::BitonicSort<Int64> uid_sorter(pm);
153 uid_sorter.sort(items_uid);
154
155 ConstArrayView<Int32> key_indexes = uid_sorter.keyIndexes();
156 ConstArrayView<Int32> key_ranks = uid_sorter.keyRanks();
157 ConstArrayView<Int64> keys = uid_sorter.keys();
158
159 UniqueArray<Int64> global_all_keys;
160 UniqueArray<Int32> global_all_key_indexes;
161 UniqueArray<Int32> global_all_key_ranks;
162
163 Int32 nb_item = keys.size();
164 const Int32 my_rank = pm->commRank();
165 const bool is_verbose = m_is_verbose;
166 if (is_verbose) {
167 for (Integer i = 0; i < math::min(nb_item, 20); ++i) {
168 info() << "ORIGINAL I=" << i << " UID=" << items_uid[i]
169 << " INDEX=" << key_indexes[i]
170 << " RANK=" << key_ranks[i]
171 << " KEY=" << keys[i];
172 }
173 }
174
175 if (m_gather_all){
176 // Le proc 0 récupère tout
177 pm->allGatherVariable(keys,global_all_keys);
178 pm->allGatherVariable(key_indexes,global_all_key_indexes);
179 pm->allGatherVariable(key_ranks,global_all_key_ranks);
180 Int32 gather_rank = 0;
181
182 if (pm->commRank()!=gather_rank){
183 global_all_key_ranks.clear();
184 global_all_key_indexes.clear();
185 global_all_keys.clear();
186 }
187 nb_item = global_all_keys.size();
188
189 key_ranks = global_all_key_ranks.view();
190 key_indexes = global_all_key_indexes.view();
191 keys = global_all_keys.view();
192 }
193
194 m_nb_item = nb_item;
195
196 m_sorted_unique_ids.resize(nb_item);
197 m_sorted_unique_ids.copy(keys);
198
199 //info() << "END SORT SIZE=" << nb_item << " KEY_SIZE=" << keys.size();
200 if (is_verbose) {
201 for (Integer i = 0; i < math::min(nb_item, 20); ++i) {
202 info() << "I=" << i << " KEY=" << keys[i]
203 << " INDEX=" << key_indexes[i]
204 << " RANK=" << key_ranks[i];
205 }
206 }
207 {
208 UniqueArray<UniqueArray<Int32>> indexes_list(pm->commSize());
209 UniqueArray<UniqueArray<Int32>> own_indexes_list(pm->commSize());
210 auto sd_exchange { ParallelMngUtils::createExchangerRef(pm) };
211
212 for( Integer i=0; i<nb_item; ++i ){
213 Int32 index = key_indexes[i];
214 Int32 rank = key_ranks[i];
215 if (rank != my_rank && indexes_list[rank].empty())
216 sd_exchange->addSender(rank);
217 indexes_list[rank].add(index);
218 own_indexes_list[rank].add(i);
219 }
220 m_local_indexes_to_recv = own_indexes_list[my_rank];
221 m_local_indexes_to_send.resize(indexes_list[my_rank].size());
222
223 sd_exchange->initializeCommunicationsMessages();
224 //info() << "NB SEND=" << sd_exchange->nbSender()
225 // << " NB_RECV=" << sd_exchange->nbReceiver();
226 Int32ConstArrayView send_sd = sd_exchange->senderRanks();
227 Integer nb_send = send_sd.size();
228 m_indexes_to_recv.resize(nb_send);
229 m_ranks_to_recv.resize(nb_send);
230 for( Integer i=0; i<nb_send; ++i ){
231 //info() << " SEND TO A: rank=" << send_sd[i];
232 ISerializeMessage* send_msg = sd_exchange->messageToSend(i);
233 Int32 dest_rank = send_sd[i];
234 ISerializer* serializer = send_msg->serializer();
235 m_indexes_to_recv[i] = own_indexes_list[dest_rank];
236 m_ranks_to_recv[i] = dest_rank;
237 serializer->setMode(ISerializer::ModeReserve);
238
239 serializer->reserveArray(indexes_list[dest_rank]);
240
241 serializer->allocateBuffer();
242 serializer->setMode(ISerializer::ModePut);
243
244 serializer->putArray(indexes_list[dest_rank]);
245 if (is_verbose) {
246 Integer nb_to_send = indexes_list[dest_rank].size();
247 for (Integer z = 0; z < nb_to_send; ++z) {
248 Integer index = indexes_list[dest_rank][z];
249 info() << " SEND Z=" << z << " RANK=" << dest_rank << " index=" << index;
250 }
251 }
252 }
253 sd_exchange->processExchange();
254
255 ConstArrayView<Int32> recv_sd = sd_exchange->receiverRanks();
256 const Int32 nb_recv = recv_sd.size();
257 m_indexes_to_send.resize(nb_recv);
258 m_ranks_to_send.resize(nb_recv);
259 for( Integer i=0; i<nb_recv; ++i ){
260 //info() << " RECEIVE FROM A: rank=" << recv_sd[i];
261 ISerializeMessage* recv_msg = sd_exchange->messageToReceive(i);
262 Int32 orig_rank = recv_sd[i];
263 ISerializer* serializer = recv_msg->serializer();
264 serializer->setMode(ISerializer::ModeGet);
265 Int32Array& recv_indexes = m_indexes_to_send[i];
266 m_ranks_to_send[i] = orig_rank;
267 serializer->getArray(recv_indexes);
268 const Int32 nb_to_recv = recv_indexes.size();
269 //info() << " RECEIVE FROM A: NB_TO_RECEIVE " << nb_to_recv << " S2=" << own_group_local_ids.size();
270 for( Integer z=0; z<nb_to_recv; ++z ){
271 Int32 index = recv_indexes[z];
272 //info() << " RECV Z=" << z << " RANK=" << orig_rank << " index=" << index
273 // << " index2=" << own_group_local_ids[index];
274 recv_indexes[z] = local_ids[index];
275 }
276 //info() << "END RECEIVE FROM A: NB_TO_RECEIVE " << nb_to_recv;
277 }
278
279 // Traite les entités locales
280 {
281 const Int32 nb_local = m_local_indexes_to_send.size();
282 for (Int32 z = 0; z < nb_local; ++z) {
283 Int32 index = indexes_list[my_rank][z];
284 //info() << " RECV Z=" << z << " RANK=" << orig_rank << " index=" << index
285 // << " index2=" << own_group_local_ids[index];
286 m_local_indexes_to_send[z] = local_ids[index];
287 }
288 }
289
290 }
291}
292
293/*---------------------------------------------------------------------------*/
294/*---------------------------------------------------------------------------*/
295
296Ref<IData> ParallelDataWriter::Impl::
297getSortedValues(IData* data)
298{
299 IParallelMng* pm = m_parallel_mng;
300 Ref<IData> sorted_data = data->cloneEmptyRef();
301
302 auto sd_exchange { ParallelMngUtils::createExchangerRef(pm) };
303 for (Int32 rank_to_send : m_ranks_to_send)
304 sd_exchange->addSender(rank_to_send);
305
306 UniqueArray<Int32> ranks_to_recv2;
307 for (Int32 rank_to_receive : m_ranks_to_recv)
308 ranks_to_recv2.add(rank_to_receive);
309
310 sd_exchange->initializeCommunicationsMessages(ranks_to_recv2);
311 Int32ConstArrayView send_sd = sd_exchange->senderRanks();
312 const Int32 nb_send = send_sd.size();
313 for( Integer i=0; i<nb_send; ++i ){
314 //info() << " SEND TO B: rank=" << send_sd[i];
315 ISerializeMessage* send_msg = sd_exchange->messageToSend(i);
316 //Int32 dest_rank = send_sd[i];
317 ISerializer* serializer = send_msg->serializer();
318 serializer->setMode(ISerializer::ModeReserve);
319 data->serialize(serializer,m_indexes_to_send[i],0);
320 serializer->allocateBuffer();
321 serializer->setMode(ISerializer::ModePut);
322 data->serialize(serializer,m_indexes_to_send[i],0);
323 }
324
325 sd_exchange->processExchange();
326
327 Int32ConstArrayView recv_sd = sd_exchange->receiverRanks();
328 const Int32 nb_recv = recv_sd.size();
329 sorted_data->resize(m_nb_item);
330 for( Integer i=0; i<nb_recv; ++i ){
331 //info() << " RECEIVE FROM B: rank=" << recv_sd[i];
332 ISerializeMessage* recv_msg = sd_exchange->messageToReceive(i);
333 //Int32 orig_rank = recv_sd[i];
334 ISerializer* serializer = recv_msg->serializer();
335 serializer->setMode(ISerializer::ModeGet);
336 sorted_data->serialize(serializer,m_indexes_to_recv[i],0);
337 }
338
339 // Traite les données qui sont déjà présentes sur ce processeur.
340 {
341 ConstArrayView<Int32> local_recv_indexes = m_local_indexes_to_recv;
342 const Int32 nb_local_index = local_recv_indexes.size();
343 if (nb_local_index>0){
344 SerializeBuffer sbuf;
345 sbuf.setMode(ISerializer::ModeReserve);
346 data->serialize(&sbuf, m_local_indexes_to_send, nullptr);
347 sbuf.allocateBuffer();
348 sbuf.setMode(ISerializer::ModePut);
349 data->serialize(&sbuf, m_local_indexes_to_send, nullptr);
350 sbuf.setMode(ISerializer::ModeGet);
351 sorted_data->serialize(&sbuf, local_recv_indexes, nullptr);
352 }
353 }
354 return sorted_data;
355}
356
357/*---------------------------------------------------------------------------*/
358/*---------------------------------------------------------------------------*/
359
360/*---------------------------------------------------------------------------*/
361/*---------------------------------------------------------------------------*/
362
363Ref<ParallelDataWriter> ParallelDataWriterList::
364getOrCreateWriter(const ItemGroup& group)
365{
366 auto i = m_data_writers.find(group);
367 if (i != m_data_writers.end())
368 return i->second;
369 IParallelMng* pm = group.itemFamily()->parallelMng();
370 Ref<ParallelDataWriter> writer = makeRef(new ParallelDataWriter(pm));
371 {
372 Int64UniqueArray items_uid;
373 ItemGroup own_group = group.own();
374 MeshUtils::fillUniqueIds(own_group.view(), items_uid);
375 Int32ConstArrayView local_ids = own_group.internal()->itemsLocalId();
376 writer->sort(local_ids, items_uid);
377 }
378 m_data_writers.try_emplace(group, writer);
379 return writer;
380}
381
382/*---------------------------------------------------------------------------*/
383/*---------------------------------------------------------------------------*/
384
385} // End namespace Arcane
386
387/*---------------------------------------------------------------------------*/
388/*---------------------------------------------------------------------------*/
Fonctions utilitaires sur le maillage.
Integer size() const
Nombre d'éléments du vecteur.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
Interface d'une donnée.
Definition IData.h:33
Interface du gestionnaire de parallélisme pour un sous-domaine.
@ ModePut
Le sérialiseur attend des reserve()
UniqueArray< Int32 > m_ranks_to_recv
Tableau indiquant les rangs des process auxquels on envoie des infos.
UniqueArray< Int32 > m_ranks_to_send
Tableau indiquant les rangs des process dont on recoit des infos.
Référence à une instance.
TraceAccessor(ITraceMng *m)
Construit un accesseur via le gestionnaire de trace m.
Vecteur 1D de données avec sémantique par valeur (style STL).
__host__ __device__ Real2 min(Real2 a, Real2 b)
Retourne le minimum de deux Real2.
Definition MathUtils.h:336
Ref< IParallelExchanger > createExchangerRef(IParallelMng *pm)
Retourne une interface pour transférer des messages entre rangs.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
UniqueArray< Int64 > Int64UniqueArray
Tableau dynamique à une dimension d'entiers 64 bits.
Definition UtilsTypes.h:426
Int32 Integer
Type représentant un entier.
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:569
ConstArrayView< Int64 > Int64ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 64 bits.
Definition UtilsTypes.h:567
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
Array< Int32 > Int32Array
Tableau dynamique à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:214
std::int32_t Int32
Type entier signé sur 32 bits.