14#include "arcane/utils/FatalErrorException.h"
15#include "arcane/utils/MemoryView.h"
17#include "arcane/parallel/mpi/MpiParallelMng.h"
18#include "arcane/parallel/mpi/MpiTimeInterval.h"
19#include "arcane/parallel/IStat.h"
21#include "arcane/impl/IDataSynchronizeBuffer.h"
22#include "arcane/impl/IDataSynchronizeImplementation.h"
24#include "arccore/message_passing/IRequestList.h"
25#include "arccore/message_passing_mpi/internal/MpiAdapter.h"
66class MpiVariableSynchronizeDispatcher
72 explicit MpiVariableSynchronizeDispatcher(
Factory* f);
76 void compute()
override {}
98 : m_mpi_parallel_mng(mpi_pm)
103 auto* x =
new MpiVariableSynchronizeDispatcher(
this);
125MpiVariableSynchronizeDispatcher::
126MpiVariableSynchronizeDispatcher(Factory* f)
127: m_mpi_parallel_mng(f->m_mpi_parallel_mng)
128, m_receive_request_list(m_mpi_parallel_mng->createRequestListRef())
129, m_send_request_list(m_mpi_parallel_mng->createRequestListRef())
136void MpiVariableSynchronizeDispatcher::
137beginSynchronize(IDataSynchronizeBuffer* ds_buf)
139 Integer nb_message = ds_buf->nbRank();
141 m_send_request_list->clear();
143 MpiParallelMng* pm = m_mpi_parallel_mng;
145 MP::Mpi::MpiAdapter* mpi_adapter = pm->adapter();
146 const MPI_Datatype mpi_dt = MP::Mpi::MpiBuiltIn::datatype(
Byte());
148 double prepare_time = 0.0;
151 MpiTimeInterval tit(&prepare_time);
152 constexpr int serialize_tag = 523;
155 m_original_recv_requests_done.
resize(nb_message);
156 m_original_recv_requests.resize(nb_message);
159 for (
Integer i = 0; i < nb_message; ++i) {
160 Int32 target_rank = ds_buf->targetRank(i);
161 auto buf = ds_buf->receiveBuffer(i).bytes();
163 auto req = mpi_adapter->receiveNonBlockingNoStat(buf.data(), buf.size(),
164 target_rank, mpi_dt, serialize_tag);
165 m_original_recv_requests[i] = req;
166 m_original_recv_requests_done[i] =
false;
171 m_original_recv_requests[i] = Parallel::Request{};
172 m_original_recv_requests_done[i] =
true;
177 ds_buf->copyAllSend();
180 for (
Integer i = 0; i < nb_message; ++i) {
181 auto buf = ds_buf->sendBuffer(i).bytes();
182 Int32 target_rank = ds_buf->targetRank(i);
184 auto request = mpi_adapter->sendNonBlockingNoStat(buf.data(), buf.size(),
185 target_rank, mpi_dt, serialize_tag);
186 m_send_request_list->add(request);
190 pm->stat()->add(
"SyncPrepare", prepare_time, ds_buf->totalSendSize());
196void MpiVariableSynchronizeDispatcher::
197endSynchronize(IDataSynchronizeBuffer* ds_buf)
199 MpiParallelMng* pm = m_mpi_parallel_mng;
203 UniqueArray<Integer> remaining_original_indexes;
205 double copy_time = 0.0;
206 double wait_time = 0.0;
210 m_receive_request_list->clear();
211 remaining_original_indexes.clear();
212 for (
Integer i = 0, n = m_original_recv_requests_done.size(); i < n; ++i) {
213 if (!m_original_recv_requests_done[i]) {
214 m_receive_request_list->add(m_original_recv_requests[i]);
215 remaining_original_indexes.add(i);
218 Integer nb_remaining_request = m_receive_request_list->size();
219 if (nb_remaining_request == 0)
223 MpiTimeInterval tit(&wait_time);
228 ConstArrayView<Int32> done_requests = m_receive_request_list->doneRequestIndexes();
230 for (
Int32 request_index : done_requests) {
231 Int32 orig_index = remaining_original_indexes[request_index];
234 m_original_recv_requests_done[orig_index] =
true;
238 MpiTimeInterval tit(©_time);
239 ds_buf->copyReceiveAsync(orig_index);
248 MpiTimeInterval tit(&wait_time);
249 m_send_request_list->wait(Parallel::WaitAll);
255 Int64 total_ghost_size = ds_buf->totalReceiveSize();
256 Int64 total_share_size = ds_buf->totalSendSize();
257 Int64 total_size = total_ghost_size + total_share_size;
258 pm->stat()->add(
"SyncCopy", copy_time, total_ghost_size);
259 pm->stat()->add(
"SyncWait", wait_time, total_size);
void resize(Int64 s)
Change le nombre d'éléments du tableau à s.
Buffer générique pour la synchronisation de données.
Interface d'une fabrique dispatcher générique.
Gestionnaire du parallélisme utilisant MPI.
Référence à une instance.
Vecteur 1D de données avec sémantique par valeur (style STL).
@ WaitSome
Attend que tous les messages de la liste soient traités.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
unsigned char Byte
Type d'un octet.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int32_t Int32
Type entier signé sur 32 bits.