14#include "arcane/std/internal/ParallelDataReader.h"
16#include "arcane/utils/ScopedPtr.h"
17#include "arcane/utils/FatalErrorException.h"
18#include "arcane/utils/FixedArray.h"
19#include "arcane/utils/CheckedConvert.h"
21#include "arcane/core/IParallelMng.h"
22#include "arcane/core/IParallelExchanger.h"
23#include "arcane/core/ISerializer.h"
24#include "arcane/core/ISerializeMessage.h"
25#include "arcane/core/SerializeBuffer.h"
26#include "arcane/core/IData.h"
27#include "arcane/core/parallel/BitonicSortT.H"
28#include "arcane/core/ParallelMngUtils.h"
64 Int64Array& writtenUniqueIds() {
return m_written_unique_ids; }
65 Int64Array& wantedUniqueIds() {
return m_wanted_unique_ids; }
112Array<Int64>& ParallelDataReader::
115 return m_p->writtenUniqueIds();
117Array<Int64>& ParallelDataReader::
120 return m_p->wantedUniqueIds();
122void ParallelDataReader::
127void ParallelDataReader::
128getSortedValues(IData* written_data, IData* data)
130 m_p->getSortedValues(written_data,data);
136ParallelDataReader::Impl::
137Impl(IParallelMng* pm)
138: TraceAccessor(pm->traceMng())
146void ParallelDataReader::Impl::
149 Int32 nb_wanted_uid = m_wanted_unique_ids.size();
151 Int32 nb_rank = m_parallel_mng->commSize();
152 Int32 my_rank = m_parallel_mng->commRank();
156 FixedArray<Int64, 2> min_max_written_uid;
157 min_max_written_uid[0] = NULL_ITEM_UNIQUE_ID;
158 min_max_written_uid[1] = NULL_ITEM_UNIQUE_ID;
159 Integer nb_written_uid = m_written_unique_ids.size();
161 if (nb_written_uid!=0){
164 min_max_written_uid[0] = m_written_unique_ids[0];
165 min_max_written_uid[1] = m_written_unique_ids[nb_written_uid-1];
167 m_parallel_mng->allGather(min_max_written_uid.view(), global_min_max_uid);
169 for( Integer irank=0; irank<nb_rank; ++irank )
170 info(5) <<
"MIN_MAX_UIDS p=" << irank <<
" min=" << global_min_max_uid[irank*2]
171 <<
" max=" << global_min_max_uid[(irank*2)+1];
173 m_data_to_recv_indexes.resize(nb_rank);
175 UniqueArray< SharedArray<Int64> > uids_list(nb_rank);
177 for( Integer i=0; i<nb_wanted_uid; ++i ){
178 Int64 uid = m_wanted_unique_ids[i];
181 for(
Int32 irank=0; irank<nb_rank; ++irank ){
182 if (uid>=global_min_max_uid[irank*2] && uid<=global_min_max_uid[(irank*2)+1]){
192 if (uids_list[rank].empty()){
193 exchanger->addSender(rank);
195 uids_list[rank].add(uid);
197 m_data_to_recv_indexes[rank].add(i);
199 exchanger->initializeCommunicationsMessages();
203 Integer nb_send = senders.size();
204 for( Integer i=0; i<nb_send; ++i ){
206 ISerializeMessage* send_msg = exchanger->messageToSend(i);
207 Int32 dest_rank = senders[i];
208 ISerializer* serializer = send_msg->serializer();
209 serializer->setMode(ISerializer::ModeReserve);
210 serializer->reserveArray(uids_list[dest_rank]);
211 serializer->allocateBuffer();
213 serializer->putArray(uids_list[dest_rank]);
215 for( Integer z=0; z<nb_to_send; ++z ){
216 Integer index = indexes_list[dest_rank][z];
217 info() <<
" SEND Z=" << z <<
" RANK=" << dest_rank <<
" index=" << index
218 <<
" own_index=" << indexes_to_recv[i][z];
222 exchanger->processExchange();
224 Integer nb_recv = receivers.size();
226 m_data_to_send_local_indexes.resize(nb_recv);
227 m_data_to_send_ranks.resize(nb_recv);
229 for( Integer i=0; i<nb_recv; ++i ){
231 ISerializeMessage* recv_msg = exchanger->messageToReceive(i);
232 Int32 orig_rank = receivers[i];
233 m_data_to_send_ranks[i] = orig_rank;
234 ISerializer* serializer = recv_msg->serializer();
237 serializer->getArray(recv_uids);
238 Int64 nb_to_recv = recv_uids.largeSize();
240 m_data_to_send_local_indexes[i].resize(nb_to_recv);
241 _searchUniqueIdIndexes(recv_uids,m_written_unique_ids,m_data_to_send_local_indexes[i]);
247 Int32Array& local_recv_indexes = m_data_to_recv_indexes[my_rank];
248 Integer nb_local_index = local_recv_indexes.size();
249 if (nb_local_index>0){
250 m_local_send_indexes.resize(nb_local_index);
252 for( Integer i=0; i<nb_local_index; ++i ){
253 uids[i] = m_wanted_unique_ids[local_recv_indexes[i]];
255 _searchUniqueIdIndexes(uids,m_written_unique_ids,m_local_send_indexes);
263void ParallelDataReader::Impl::
264getSortedValues(IData* written_data,IData* data)
267 Integer nb_send = m_data_to_send_ranks.size();
268 for( Integer i=0; i<nb_send; ++i ){
269 exchanger->addSender(m_data_to_send_ranks[i]);
272 exchanger->initializeCommunicationsMessages();
274 for( Integer i=0; i<nb_send; ++i ){
275 ISerializeMessage* send_msg = exchanger->messageToSend(i);
278 ISerializer* serializer = send_msg->serializer();
279 serializer->setMode(ISerializer::ModeReserve);
281 written_data->serialize(serializer,m_data_to_send_local_indexes[i],0);
282 serializer->allocateBuffer();
285 written_data->serialize(serializer,m_data_to_send_local_indexes[i],0);
287 exchanger->processExchange();
289 Integer nb_wanted_uid = m_wanted_unique_ids.size();
290 data->resize(nb_wanted_uid);
294 Integer my_rank = m_parallel_mng->commRank();
295 ConstArrayView<Int32> local_recv_indexes = m_data_to_recv_indexes[my_rank];
296 Integer nb_local_index = local_recv_indexes.size();
297 if (nb_local_index>0){
299 SerializeBuffer sbuf;
300 sbuf.setMode(ISerializer::ModeReserve);
302 written_data->serialize(&sbuf,m_local_send_indexes,0);
303 sbuf.allocateBuffer();
307 written_data->serialize(&sbuf,m_local_send_indexes,0);
310 data->serialize(&sbuf,local_recv_indexes,0);
315 Integer nb_recv = receivers.size();
316 for( Integer i=0; i<nb_recv; ++i ){
317 ISerializeMessage* recv_msg = exchanger->messageToReceive(i);
318 Int32 orig_rank = receivers[i];
320 ISerializer* serializer = recv_msg->serializer();
322 data->serialize(serializer,m_data_to_recv_indexes[orig_rank],0);
329void ParallelDataReader::Impl::
332 Array<Int32>& indexes)
const
334 Integer nb_to_recv = recv_uids.size();
335 Integer nb_written_uid = written_unique_ids.size();
337 for( Integer irecv=0; irecv<nb_to_recv; ++irecv ){
338 Int64 my_uid = recv_uids[irecv];
340 auto iter_end = written_unique_ids.end();
341 auto iter_begin = written_unique_ids.begin();
342 auto x2 = std::lower_bound(iter_begin, iter_end, my_uid);
344 ARCANE_FATAL(
"Can not find uid uid={0} (with binary_search)", my_uid);
348 if (written_unique_ids[my_index]!=my_uid)
350 "Index={0} uid={1} wuid={2} n={3}",
351 my_index,my_uid,written_unique_ids[my_index],nb_written_uid);
355 indexes[irecv] = my_index;
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Tableau d'items de types quelconques.
Interface du gestionnaire de parallélisme pour un sous-domaine.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Vue constante d'un tableau de type T.
@ ModePut
Le sérialiseur attend des reserve()
@ ModeGet
Le sérialiseur attend des get()
Classe d'accès aux traces.
Vecteur 1D de données avec sémantique par valeur (style STL).
Int32 toInt32(Int64 v)
Converti un Int64 en un Int32.
Ref< IParallelExchanger > createExchangerRef(IParallelMng *pm)
Retourne une interface pour transférer des messages entre rangs.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
UniqueArray< Int64 > Int64UniqueArray
Tableau dynamique à une dimension d'entiers 64 bits.
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Array< Int32 > Int32Array
Tableau dynamique à une dimension d'entiers 32 bits.
ConstArrayView< Int64 > Int64ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 64 bits.
Int32 Integer
Type représentant un entier.