14#include "arcane/std/internal/ParallelDataReader.h"
16#include "arcane/utils/ScopedPtr.h"
17#include "arcane/utils/FatalErrorException.h"
18#include "arcane/utils/FixedArray.h"
19#include "arcane/utils/CheckedConvert.h"
21#include "arcane/core/IParallelMng.h"
22#include "arcane/core/IParallelExchanger.h"
23#include "arcane/core/ISerializer.h"
24#include "arcane/core/ISerializeMessage.h"
25#include "arcane/core/SerializeBuffer.h"
26#include "arcane/core/IData.h"
27#include "arcane/core/parallel/BitonicSortT.H"
28#include "arcane/core/ParallelMngUtils.h"
64 Int64Array& writtenUniqueIds() {
return m_written_unique_ids; }
65 Int64Array& wantedUniqueIds() {
return m_wanted_unique_ids; }
85 void getSortedValues(
IData* written_data,
IData* data);
112Array<Int64>& ParallelDataReader::
115 return m_p->writtenUniqueIds();
117Array<Int64>& ParallelDataReader::
120 return m_p->wantedUniqueIds();
122void ParallelDataReader::
127void ParallelDataReader::
128getSortedValues(IData* written_data, IData* data)
130 m_p->getSortedValues(written_data,data);
136ParallelDataReader::Impl::
137Impl(IParallelMng* pm)
138: TraceAccessor(pm->traceMng())
146void ParallelDataReader::Impl::
149 Int32 nb_wanted_uid = m_wanted_unique_ids.size();
151 Int32 nb_rank = m_parallel_mng->commSize();
152 Int32 my_rank = m_parallel_mng->commRank();
156 FixedArray<Int64, 2> min_max_written_uid;
157 min_max_written_uid[0] = NULL_ITEM_UNIQUE_ID;
158 min_max_written_uid[1] = NULL_ITEM_UNIQUE_ID;
159 Integer nb_written_uid = m_written_unique_ids.size();
161 if (nb_written_uid!=0){
164 min_max_written_uid[0] = m_written_unique_ids[0];
165 min_max_written_uid[1] = m_written_unique_ids[nb_written_uid-1];
167 m_parallel_mng->allGather(min_max_written_uid.view(), global_min_max_uid);
169 for(
Integer irank=0; irank<nb_rank; ++irank )
170 info(5) <<
"MIN_MAX_UIDS p=" << irank <<
" min=" << global_min_max_uid[irank*2]
171 <<
" max=" << global_min_max_uid[(irank*2)+1];
173 m_data_to_recv_indexes.resize(nb_rank);
175 UniqueArray< SharedArray<Int64> > uids_list(nb_rank);
177 for(
Integer i=0; i<nb_wanted_uid; ++i ){
178 Int64 uid = m_wanted_unique_ids[i];
181 for(
Int32 irank=0; irank<nb_rank; ++irank ){
182 if (uid>=global_min_max_uid[irank*2] && uid<=global_min_max_uid[(irank*2)+1]){
192 if (uids_list[rank].empty()){
193 exchanger->addSender(rank);
195 uids_list[rank].add(uid);
197 m_data_to_recv_indexes[rank].add(i);
199 exchanger->initializeCommunicationsMessages();
204 for(
Integer i=0; i<nb_send; ++i ){
206 ISerializeMessage* send_msg = exchanger->messageToSend(i);
207 Int32 dest_rank = senders[i];
208 ISerializer* serializer = send_msg->serializer();
209 serializer->setMode(ISerializer::ModeReserve);
210 serializer->reserveArray(uids_list[dest_rank]);
211 serializer->allocateBuffer();
213 serializer->putArray(uids_list[dest_rank]);
215 for(
Integer z=0; z<nb_to_send; ++z ){
216 Integer index = indexes_list[dest_rank][z];
217 info() <<
" SEND Z=" << z <<
" RANK=" << dest_rank <<
" index=" << index
218 <<
" own_index=" << indexes_to_recv[i][z];
222 exchanger->processExchange();
226 m_data_to_send_local_indexes.resize(nb_recv);
227 m_data_to_send_ranks.resize(nb_recv);
229 for(
Integer i=0; i<nb_recv; ++i ){
231 ISerializeMessage* recv_msg = exchanger->messageToReceive(i);
232 Int32 orig_rank = receivers[i];
233 m_data_to_send_ranks[i] = orig_rank;
234 ISerializer* serializer = recv_msg->serializer();
237 serializer->getArray(recv_uids);
240 m_data_to_send_local_indexes[i].resize(nb_to_recv);
241 _searchUniqueIdIndexes(recv_uids,m_written_unique_ids,m_data_to_send_local_indexes[i]);
247 Int32Array& local_recv_indexes = m_data_to_recv_indexes[my_rank];
248 Integer nb_local_index = local_recv_indexes.
size();
249 if (nb_local_index>0){
250 m_local_send_indexes.resize(nb_local_index);
252 for(
Integer i=0; i<nb_local_index; ++i ){
253 uids[i] = m_wanted_unique_ids[local_recv_indexes[i]];
255 _searchUniqueIdIndexes(uids,m_written_unique_ids,m_local_send_indexes);
263void ParallelDataReader::Impl::
264getSortedValues(IData* written_data,IData* data)
267 Integer nb_send = m_data_to_send_ranks.size();
268 for(
Integer i=0; i<nb_send; ++i ){
269 exchanger->addSender(m_data_to_send_ranks[i]);
272 exchanger->initializeCommunicationsMessages();
274 for(
Integer i=0; i<nb_send; ++i ){
275 ISerializeMessage* send_msg = exchanger->messageToSend(i);
278 ISerializer* serializer = send_msg->serializer();
279 serializer->setMode(ISerializer::ModeReserve);
281 written_data->serialize(serializer,m_data_to_send_local_indexes[i],0);
282 serializer->allocateBuffer();
285 written_data->serialize(serializer,m_data_to_send_local_indexes[i],0);
287 exchanger->processExchange();
289 Integer nb_wanted_uid = m_wanted_unique_ids.size();
290 data->resize(nb_wanted_uid);
294 Integer my_rank = m_parallel_mng->commRank();
295 ConstArrayView<Int32> local_recv_indexes = m_data_to_recv_indexes[my_rank];
296 Integer nb_local_index = local_recv_indexes.size();
297 if (nb_local_index>0){
299 SerializeBuffer sbuf;
300 sbuf.setMode(ISerializer::ModeReserve);
302 written_data->serialize(&sbuf,m_local_send_indexes,0);
303 sbuf.allocateBuffer();
307 written_data->serialize(&sbuf,m_local_send_indexes,0);
310 data->serialize(&sbuf,local_recv_indexes,0);
316 for(
Integer i=0; i<nb_recv; ++i ){
317 ISerializeMessage* recv_msg = exchanger->messageToReceive(i);
318 Int32 orig_rank = receivers[i];
320 ISerializer* serializer = recv_msg->serializer();
322 data->serialize(serializer,m_data_to_recv_indexes[orig_rank],0);
329void ParallelDataReader::Impl::
332 Array<Int32>& indexes)
const
334 Integer nb_to_recv = recv_uids.size();
335 Integer nb_written_uid = written_unique_ids.size();
337 for(
Integer irecv=0; irecv<nb_to_recv; ++irecv ){
338 Int64 my_uid = recv_uids[irecv];
340 auto iter_end = written_unique_ids.end();
341 auto iter_begin = written_unique_ids.begin();
342 auto x2 = std::lower_bound(iter_begin, iter_end, my_uid);
344 ARCANE_FATAL(
"Can not find uid uid={0} (with binary_search)", my_uid);
348 if (written_unique_ids[my_index]!=my_uid)
350 "Index={0} uid={1} wuid={2} n={3}",
351 my_index,my_uid,written_unique_ids[my_index],nb_written_uid);
355 indexes[irecv] = my_index;
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Integer size() const
Nombre d'éléments du vecteur.
Int64 largeSize() const
Nombre d'éléments du vecteur (en 64 bits)
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
Interface du gestionnaire de parallélisme pour un sous-domaine.
@ ModePut
Le sérialiseur attend des reserve()
@ ModeGet
Le sérialiseur attend des get()
TraceAccessor(ITraceMng *m)
Construit un accesseur via le gestionnaire de trace m.
Vecteur 1D de données avec sémantique par valeur (style STL).
Int32 toInt32(Int64 v)
Converti un Int64 en un Int32.
Ref< IParallelExchanger > createExchangerRef(IParallelMng *pm)
Retourne une interface pour transférer des messages entre rangs.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
Array< Int64 > Int64Array
Tableau dynamique à une dimension d'entiers 64 bits.
UniqueArray< Int64 > Int64UniqueArray
Tableau dynamique à une dimension d'entiers 64 bits.
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
ConstArrayView< Int64 > Int64ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 64 bits.
UniqueArray< Int32 > Int32UniqueArray
Tableau dynamique à une dimension d'entiers 32 bits.
Array< Int32 > Int32Array
Tableau dynamique à une dimension d'entiers 32 bits.
std::int32_t Int32
Type entier signé sur 32 bits.