Arcane  4.1.12.0
Developer documentation
Loading...
Searching...
No Matches
MpiLegacyVariableSynchronizeDispatcher.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2026 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* MpiVariableSynchronizeDispatcher.cc (C) 2000-2025 */
9/* */
10/* Specific MPI management for variable synchronization. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/utils/MemoryView.h"
15
16#include "arcane/parallel/mpi/MpiParallelMng.h"
17#include "arcane/parallel/mpi/MpiDatatypeList.h"
18#include "arcane/parallel/mpi/MpiDatatype.h"
19#include "arcane/parallel/IStat.h"
20
21#include "arcane/impl/IDataSynchronizeBuffer.h"
22#include "arcane/impl/IDataSynchronizeImplementation.h"
23
24#include "arccore/message_passing_mpi/internal/IMpiProfiling.h"
25#include "arccore/message_passing_mpi/internal/MpiAdapter.h"
26
27/*---------------------------------------------------------------------------*/
28/*---------------------------------------------------------------------------*/
29
30namespace Arcane
31{
32
33/*---------------------------------------------------------------------------*/
34/*---------------------------------------------------------------------------*/
35
46class MpiLegacyVariableSynchronizerDispatcher
48{
49 public:
50
51 class Factory;
52 explicit MpiLegacyVariableSynchronizerDispatcher(Factory* f);
53
54 protected:
55
56 void compute() override {}
57 void beginSynchronize(IDataSynchronizeBuffer* buf) override;
58 void endSynchronize(IDataSynchronizeBuffer* buf) override;
59
60 private:
61
62 MpiParallelMng* m_mpi_parallel_mng;
63 UniqueArray<MPI_Request> m_send_requests;
64 UniqueArray<MPI_Request> m_recv_requests;
65 UniqueArray<Integer> m_recv_requests_done;
66 UniqueArray<MPI_Datatype> m_share_derived_types;
67 UniqueArray<MPI_Datatype> m_ghost_derived_types;
68};
69
70/*---------------------------------------------------------------------------*/
71/*---------------------------------------------------------------------------*/
72
75{
76 public:
77
78 explicit Factory(MpiParallelMng* mpi_pm)
79 : m_mpi_parallel_mng(mpi_pm)
80 {}
81
82 Ref<IDataSynchronizeImplementation> createInstance() override
83 {
84 auto* x = new MpiLegacyVariableSynchronizerDispatcher(this);
86 }
87
88 public:
89
90 MpiParallelMng* m_mpi_parallel_mng = nullptr;
91};
92
93/*---------------------------------------------------------------------------*/
94/*---------------------------------------------------------------------------*/
95
97arcaneCreateMpiLegacyVariableSynchronizerFactory(MpiParallelMng* mpi_pm)
98{
101}
102
103/*---------------------------------------------------------------------------*/
104/*---------------------------------------------------------------------------*/
105
106MpiLegacyVariableSynchronizerDispatcher::
107MpiLegacyVariableSynchronizerDispatcher(Factory* f)
108: m_mpi_parallel_mng(f->m_mpi_parallel_mng)
109{
110}
111
112/*---------------------------------------------------------------------------*/
113/*---------------------------------------------------------------------------*/
114
115void MpiLegacyVariableSynchronizerDispatcher::
116beginSynchronize(IDataSynchronizeBuffer* vs_buf)
117{
118 Integer nb_message = vs_buf->nbRank();
119
120 m_send_requests.clear();
121
122 MpiParallelMng* pm = m_mpi_parallel_mng;
123 MPI_Comm comm = pm->communicator();
124 MpiDatatypeList* dtlist = pm->datatypes();
125 MP::Mpi::IMpiProfiling* mpi_profiling = m_mpi_parallel_mng->adapter()->getMpiProfiling();
126
127 //ITraceMng* trace = pm->traceMng();
128 //trace->info() << " ** ** MPI BEGIN SYNC n=" << nb_message
129 // << " this=" << (IVariableSynchronizeDispatcher*)this;
130 //trace->flush();
131
132 MPI_Datatype byte_dt = dtlist->datatype(Byte())->datatype();
133
134 //SyncBuffer& sync_buffer = this->m_1d_buffer;
135 // Send receive messages in non-blocking mode
136 m_recv_requests.resize(nb_message);
137 m_recv_requests_done.resize(nb_message);
138 double begin_prepare_time = MPI_Wtime();
139 for (Integer i = 0; i < nb_message; ++i) {
140 Int32 target_rank = vs_buf->targetRank(i);
141 auto ghost_local_buffer = vs_buf->receiveBuffer(i).bytes().smallView();
142 if (!ghost_local_buffer.empty()) {
143 MPI_Request mpi_request;
144 mpi_profiling->iRecv(ghost_local_buffer.data(), ghost_local_buffer.size(),
145 byte_dt, target_rank, 523, comm, &mpi_request);
146 m_recv_requests[i] = mpi_request;
147 m_recv_requests_done[i] = false;
148 //trace->info() << "POST RECV " << vsi.m_target_rank;
149 }
150 else {
151 // It is not necessary to send an empty message.
152 // Consider the message as finished
153 m_recv_requests[i] = MPI_Request();
154 m_recv_requests_done[i] = true;
155 }
156 }
157
158 vs_buf->copyAllSend();
159
160 // Send send messages in non-blocking mode.
161 for (Integer i = 0; i < nb_message; ++i) {
162 Int32 target_rank = vs_buf->targetRank(i);
163 auto share_local_buffer = vs_buf->sendBuffer(i).bytes().smallView();
164 if (!share_local_buffer.empty()) {
165 MPI_Request mpi_request;
166 mpi_profiling->iSend(share_local_buffer.data(), share_local_buffer.size(),
167 byte_dt, target_rank, 523, comm, &mpi_request);
168 m_send_requests.add(mpi_request);
169 }
170 }
171 {
172 double prepare_time = MPI_Wtime() - begin_prepare_time;
173 pm->stat()->add("SyncPrepare", prepare_time, vs_buf->totalSendSize());
174 }
175}
176
177/*---------------------------------------------------------------------------*/
178/*---------------------------------------------------------------------------*/
179
180void MpiLegacyVariableSynchronizerDispatcher::
181endSynchronize(IDataSynchronizeBuffer* vs_buf)
182{
183 MpiParallelMng* pm = m_mpi_parallel_mng;
184
185 //ITraceMng* trace = pm->traceMng();
186 //trace->info() << " ** ** MPI END SYNC "
187 // << " this=" << (IVariableSynchronizeDispatcher*)this;
188
189 UniqueArray<MPI_Request> remaining_request;
190 UniqueArray<Integer> remaining_indexes;
191
192 UniqueArray<MPI_Status> mpi_status;
193 UniqueArray<int> completed_requests;
194
195 UniqueArray<MPI_Request> m_remaining_recv_requests;
196 UniqueArray<Integer> m_remaining_recv_request_indexes;
197 double copy_time = 0.0;
198 double wait_time = 0.0;
199 while (1) {
200 m_remaining_recv_requests.clear();
201 m_remaining_recv_request_indexes.clear();
202 for (Integer i = 0; i < m_recv_requests.size(); ++i) {
203 if (!m_recv_requests_done[i]) {
204 m_remaining_recv_requests.add(m_recv_requests[i]);
205 m_remaining_recv_request_indexes.add(i); //m_recv_request_indexes[i]);
206 }
207 }
208 Integer nb_remaining_request = m_remaining_recv_requests.size();
209 if (nb_remaining_request == 0)
210 break;
211 int nb_completed_request = 0;
212 mpi_status.resize(nb_remaining_request);
213 completed_requests.resize(nb_remaining_request);
214 {
215 double begin_time = MPI_Wtime();
216 //trace->info() << "Wait some: n=" << nb_remaining_request
217 // << " total=" << nb_message;
218 m_mpi_parallel_mng->adapter()->getMpiProfiling()->waitSome(nb_remaining_request, m_remaining_recv_requests.data(),
219 &nb_completed_request, completed_requests.data(),
220 mpi_status.data());
221 //trace->info() << "Wait some end: nb_done=" << nb_completed_request;
222 double end_time = MPI_Wtime();
223 wait_time += (end_time - begin_time);
224 }
225 // For each completed request, perform the copy
226 for (int z = 0; z < nb_completed_request; ++z) {
227 int mpi_request_index = completed_requests[z];
228 Integer index = m_remaining_recv_request_indexes[mpi_request_index];
229
230 {
231 double begin_time = MPI_Wtime();
232 vs_buf->copyReceiveAsync(index);
233 double end_time = MPI_Wtime();
234 copy_time += (end_time - begin_time);
235 }
236 //trace->info() << "Mark finish index = " << index << " mpi_request_index=" << mpi_request_index;
237 m_recv_requests_done[index] = true; // To indicate that it is finished
238 }
239 }
240
241 //trace->info() << "Wait all begin: n=" << m_send_requests.size();
242 // Wait for sends to finish
243 mpi_status.resize(m_send_requests.size());
244 m_mpi_parallel_mng->adapter()->getMpiProfiling()->waitAll(m_send_requests.size(), m_send_requests.data(),
245 mpi_status.data());
246
247 // Ensure buffer copies are complete
248 vs_buf->barrier();
249
250 //trace->info() << "Wait all end";
251 {
252 Int64 total_ghost_size = vs_buf->totalReceiveSize();
253 Int64 total_share_size = vs_buf->totalSendSize();
254 Int64 total_size = total_ghost_size + total_share_size;
255 pm->stat()->add("SyncCopy", copy_time, total_ghost_size);
256 pm->stat()->add("SyncWait", wait_time, total_size);
257 }
258}
259
260/*---------------------------------------------------------------------------*/
261/*---------------------------------------------------------------------------*/
262
263} // End namespace Arcane
264
265/*---------------------------------------------------------------------------*/
266/*---------------------------------------------------------------------------*/
void resize(Int64 s)
Changes the number of elements in the array to s.
Generic buffer for data synchronization.
Interface for a generic dispatcher factory.
Parallelism manager using MPI.
Communicator communicator() const override
MPI communicator associated with this manager.
Reference to an instance.
1D data vector with value semantics (STL style).
void resize(Int64 s)
Changes the number of elements in the array to s.
void clear()
Removes the elements from the array.
-- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature --
std::int64_t Int64
Signed integer type of 64 bits.
Int32 Integer
Type representing an integer.
unsigned char Byte
Type of a byte.
Definition BaseTypes.h:43
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Creates a reference on a pointer.
std::int32_t Int32
Signed integer type of 32 bits.