14#include "arcane/std/internal/ParallelDataWriter.h"
16#include "arcane/utils/Ref.h"
17#include "arcane/utils/Math.h"
19#include "arcane/core/IParallelMng.h"
20#include "arcane/core/IParallelExchanger.h"
21#include "arcane/core/ISerializer.h"
22#include "arcane/core/ISerializeMessage.h"
23#include "arcane/core/SerializeBuffer.h"
24#include "arcane/core/IData.h"
25#include "arcane/core/parallel/BitonicSortT.H"
26#include "arcane/core/ParallelMngUtils.h"
27#include "arcane/core/ItemGroup.h"
28#include "arcane/core/IItemFamily.h"
50 void setGatherAll(
bool v);
68 bool m_gather_all =
false;
69 bool m_is_verbose =
false;
94sortedUniqueIds()
const
96 return m_p->sortedUniqueIds();
98void ParallelDataWriter::
101 m_p->setGatherAll(v);
104void ParallelDataWriter::
107 m_p->sort(local_ids, items_uid);
110Ref<IData> ParallelDataWriter::
111getSortedValues(IData* data)
113 return m_p->getSortedValues(data);
119ParallelDataWriter::Impl::
120Impl(IParallelMng* pm)
121: TraceAccessor(pm->traceMng())
130sortedUniqueIds()
const
132 return m_sorted_unique_ids;
138void ParallelDataWriter::Impl::
147void ParallelDataWriter::Impl::
150 IParallelMng* pm = m_parallel_mng;
152 Parallel::BitonicSort<Int64> uid_sorter(pm);
153 uid_sorter.sort(items_uid);
155 ConstArrayView<Int32> key_indexes = uid_sorter.keyIndexes();
156 ConstArrayView<Int32> key_ranks = uid_sorter.keyRanks();
157 ConstArrayView<Int64> keys = uid_sorter.keys();
159 UniqueArray<Int64> global_all_keys;
160 UniqueArray<Int32> global_all_key_indexes;
161 UniqueArray<Int32> global_all_key_ranks;
163 Int32 nb_item = keys.size();
164 const Int32 my_rank = pm->commRank();
165 const bool is_verbose = m_is_verbose;
168 info() <<
"ORIGINAL I=" << i <<
" UID=" << items_uid[i]
169 <<
" INDEX=" << key_indexes[i]
170 <<
" RANK=" << key_ranks[i]
171 <<
" KEY=" << keys[i];
177 pm->allGatherVariable(keys, global_all_keys);
178 pm->allGatherVariable(key_indexes, global_all_key_indexes);
179 pm->allGatherVariable(key_ranks, global_all_key_ranks);
180 Int32 gather_rank = 0;
182 if (pm->commRank() != gather_rank) {
183 global_all_key_ranks.clear();
184 global_all_key_indexes.clear();
185 global_all_keys.clear();
187 nb_item = global_all_keys.size();
189 key_ranks = global_all_key_ranks.view();
190 key_indexes = global_all_key_indexes.view();
191 keys = global_all_keys.view();
196 m_sorted_unique_ids.resize(nb_item);
197 m_sorted_unique_ids.copy(keys);
202 info() <<
"I=" << i <<
" KEY=" << keys[i]
203 <<
" INDEX=" << key_indexes[i]
204 <<
" RANK=" << key_ranks[i];
208 UniqueArray<UniqueArray<Int32>> indexes_list(pm->commSize());
209 UniqueArray<UniqueArray<Int32>> own_indexes_list(pm->commSize());
212 for (
Integer i = 0; i < nb_item; ++i) {
213 Int32 index = key_indexes[i];
214 Int32 rank = key_ranks[i];
215 if (rank != my_rank && indexes_list[rank].empty())
216 sd_exchange->addSender(rank);
217 indexes_list[rank].add(index);
218 own_indexes_list[rank].add(i);
220 m_local_indexes_to_recv = own_indexes_list[my_rank];
221 m_local_indexes_to_send.resize(indexes_list[my_rank].size());
223 sd_exchange->initializeCommunicationsMessages();
228 m_indexes_to_recv.resize(nb_send);
229 m_ranks_to_recv.resize(nb_send);
230 for (
Integer i = 0; i < nb_send; ++i) {
232 ISerializeMessage* send_msg = sd_exchange->messageToSend(i);
233 Int32 dest_rank = send_sd[i];
234 ISerializer* serializer = send_msg->serializer();
235 m_indexes_to_recv[i] = own_indexes_list[dest_rank];
236 m_ranks_to_recv[i] = dest_rank;
237 serializer->setMode(ISerializer::ModeReserve);
239 serializer->reserveArray(indexes_list[dest_rank]);
241 serializer->allocateBuffer();
244 serializer->putArray(indexes_list[dest_rank]);
246 Integer nb_to_send = indexes_list[dest_rank].size();
247 for (
Integer z = 0; z < nb_to_send; ++z) {
248 Integer index = indexes_list[dest_rank][z];
249 info() <<
" SEND Z=" << z <<
" RANK=" << dest_rank <<
" index=" << index;
253 sd_exchange->processExchange();
255 ConstArrayView<Int32> recv_sd = sd_exchange->receiverRanks();
256 const Int32 nb_recv = recv_sd.size();
257 m_indexes_to_send.resize(nb_recv);
258 m_ranks_to_send.resize(nb_recv);
259 for (
Integer i = 0; i < nb_recv; ++i) {
261 ISerializeMessage* recv_msg = sd_exchange->messageToReceive(i);
262 Int32 orig_rank = recv_sd[i];
263 ISerializer* serializer = recv_msg->serializer();
265 Int32Array& recv_indexes = m_indexes_to_send[i];
266 m_ranks_to_send[i] = orig_rank;
267 serializer->getArray(recv_indexes);
268 const Int32 nb_to_recv = recv_indexes.
size();
270 for (
Integer z = 0; z < nb_to_recv; ++z) {
271 Int32 index = recv_indexes[z];
274 recv_indexes[z] = local_ids[index];
281 const Int32 nb_local = m_local_indexes_to_send.size();
282 for (
Int32 z = 0; z < nb_local; ++z) {
283 Int32 index = indexes_list[my_rank][z];
286 m_local_indexes_to_send[z] = local_ids[index];
295Ref<IData> ParallelDataWriter::Impl::
296getSortedValues(IData* data)
298 IParallelMng* pm = m_parallel_mng;
299 Ref<IData> sorted_data = data->cloneEmptyRef();
302 for (
Int32 rank_to_send : m_ranks_to_send)
303 sd_exchange->addSender(rank_to_send);
305 UniqueArray<Int32> ranks_to_recv2;
306 for (
Int32 rank_to_receive : m_ranks_to_recv)
307 ranks_to_recv2.add(rank_to_receive);
309 sd_exchange->initializeCommunicationsMessages(ranks_to_recv2);
312 for (
Integer i = 0; i < nb_send; ++i) {
314 ISerializeMessage* send_msg = sd_exchange->messageToSend(i);
316 ISerializer* serializer = send_msg->serializer();
317 serializer->setMode(ISerializer::ModeReserve);
318 data->serialize(serializer, m_indexes_to_send[i], 0);
319 serializer->allocateBuffer();
321 data->serialize(serializer, m_indexes_to_send[i], 0);
324 sd_exchange->processExchange();
328 sorted_data->resize(m_nb_item);
329 for (
Integer i = 0; i < nb_recv; ++i) {
331 ISerializeMessage* recv_msg = sd_exchange->messageToReceive(i);
333 ISerializer* serializer = recv_msg->serializer();
335 sorted_data->serialize(serializer, m_indexes_to_recv[i], 0);
340 ConstArrayView<Int32> local_recv_indexes = m_local_indexes_to_recv;
341 const Int32 nb_local_index = local_recv_indexes.size();
342 if (nb_local_index > 0) {
343 SerializeBuffer sbuf;
344 sbuf.setMode(ISerializer::ModeReserve);
345 data->serialize(&sbuf, m_local_indexes_to_send,
nullptr);
346 sbuf.allocateBuffer();
348 data->serialize(&sbuf, m_local_indexes_to_send,
nullptr);
350 sorted_data->serialize(&sbuf, local_recv_indexes,
nullptr);
362Ref<ParallelDataWriter> ParallelDataWriterList::
363getOrCreateWriter(
const ItemGroup& group)
365 auto i = m_data_writers.find(group);
366 if (i != m_data_writers.end())
368 IParallelMng* pm = group.itemFamily()->parallelMng();
369 Ref<ParallelDataWriter> writer =
makeRef(
new ParallelDataWriter(pm));
372 ItemGroup own_group = group.own();
373 MeshUtils::fillUniqueIds(own_group.view(), items_uid);
375 writer->sort(local_ids, items_uid);
377 m_data_writers.try_emplace(group, writer);
Utility functions for the mesh.
Integer size() const
Number of elements in the vector.
constexpr Integer size() const noexcept
Number of elements in the array.
Interface of a data item.
Interface of the parallelism manager for a subdomain.
@ ModePut
The serializer expects reserve().
@ ModeGet
The serializer expects get().
UniqueArray< Int32 > m_ranks_to_recv
Array indicating the ranks of processes to which we send information.
UniqueArray< Int32 > m_ranks_to_send
Array indicating the ranks of processes from which we receive information.
Reference to an instance.
TraceAccessor(ITraceMng *m)
Constructs an accessor via the trace manager m.
1D data vector with value semantics (STL style).
__host__ __device__ Real2 min(Real2 a, Real2 b)
Returns the minimum of two Real2.
Ref< IParallelExchanger > createExchangerRef(IParallelMng *pm)
Returns an interface to transfer messages between ranks.
-- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature --
UniqueArray< Int64 > Int64UniqueArray
Dynamic 1D array of 64-bit integers.
Int32 Integer
Type representing an integer.
ConstArrayView< Int32 > Int32ConstArrayView
C equivalent of a 1D array of 32-bit integers.
ConstArrayView< Int64 > Int64ConstArrayView
C equivalent of a 1D array of 64-bit integers.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Creates a reference on a pointer.
Array< Int32 > Int32Array
Dynamic one-dimensional array of 32-bit integers.
std::int32_t Int32
Signed integer type of 32 bits.