Arcane  4.1.12.0
Developer documentation
Loading...
Searching...
No Matches
MpiVariableSynchronizeDispatcher.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2026 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* MpiVariableSynchronizeDispatcher.cc (C) 2000-2025 */
9/* */
10/* Specific MPI handling for variable synchronization. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/utils/FatalErrorException.h"
15#include "arcane/utils/MemoryView.h"
16
17#include "arcane/parallel/mpi/MpiParallelMng.h"
18#include "arcane/parallel/mpi/MpiTimeInterval.h"
19#include "arcane/parallel/IStat.h"
20
21#include "arcane/impl/IDataSynchronizeBuffer.h"
22#include "arcane/impl/IDataSynchronizeImplementation.h"
23
24#include "arccore/message_passing/IRequestList.h"
25#include "arccore/message_passing_mpi/internal/MpiAdapter.h"
26
27/*---------------------------------------------------------------------------*/
28/*---------------------------------------------------------------------------*/
29
30/*
31 * The synchronization algorithm works as follows. The first three points are
32 * in beginSynchronize() and the last two are in endSynchronize(). The current
33 * code only allows for non-blocking synchronization at a time.
34 *
35 * 1. Post receive messages
36 * 2. Copy the values to be sent into the send buffers. This is done after
37 * posting the receive messages to allow for some overlap between computation
38 * and communication.
39 * 3. Post send messages.
40 * 4. Perform a WaitSome on the receive messages. As soon as a message
41 * arrives, the receive buffer is copied into the variable array. The code
42 * could be simplified by using WaitAll and copying all values at the end.
43 * 5. Perform a WaitAll on the send messages to free the requests.
44*/
45/*---------------------------------------------------------------------------*/
46/*---------------------------------------------------------------------------*/
47
48namespace Arcane
49{
50
51/*---------------------------------------------------------------------------*/
52/*---------------------------------------------------------------------------*/
53
64class MpiVariableSynchronizeDispatcher
66{
67 public:
68
69 class Factory;
70 explicit MpiVariableSynchronizeDispatcher(Factory* f);
71
72 protected:
73
74 void compute() override {}
75 void beginSynchronize(IDataSynchronizeBuffer* ds_buf) override;
76 void endSynchronize(IDataSynchronizeBuffer* ds_buf) override;
77
78 private:
79
80 MpiParallelMng* m_mpi_parallel_mng;
81 UniqueArray<Parallel::Request> m_original_recv_requests;
82 UniqueArray<bool> m_original_recv_requests_done;
83 Ref<Parallel::IRequestList> m_receive_request_list;
84 Ref<Parallel::IRequestList> m_send_request_list;
85};
86
87/*---------------------------------------------------------------------------*/
88/*---------------------------------------------------------------------------*/
89
92{
93 public:
94
95 explicit Factory(MpiParallelMng* mpi_pm)
96 : m_mpi_parallel_mng(mpi_pm)
97 {}
98
99 Ref<IDataSynchronizeImplementation> createInstance() override
100 {
101 auto* x = new MpiVariableSynchronizeDispatcher(this);
103 }
104
105 public:
106
107 MpiParallelMng* m_mpi_parallel_mng = nullptr;
108};
109
110/*---------------------------------------------------------------------------*/
111/*---------------------------------------------------------------------------*/
112
114arcaneCreateMpiVariableSynchronizerFactory(MpiParallelMng* mpi_pm)
115{
116 auto* x = new MpiVariableSynchronizeDispatcher::Factory(mpi_pm);
118}
119
120/*---------------------------------------------------------------------------*/
121/*---------------------------------------------------------------------------*/
122
123MpiVariableSynchronizeDispatcher::
124MpiVariableSynchronizeDispatcher(Factory* f)
125: m_mpi_parallel_mng(f->m_mpi_parallel_mng)
126, m_receive_request_list(m_mpi_parallel_mng->createRequestListRef())
127, m_send_request_list(m_mpi_parallel_mng->createRequestListRef())
128{
129}
130
131/*---------------------------------------------------------------------------*/
132/*---------------------------------------------------------------------------*/
133
134void MpiVariableSynchronizeDispatcher::
135beginSynchronize(IDataSynchronizeBuffer* ds_buf)
136{
137 Integer nb_message = ds_buf->nbRank();
138
139 m_send_request_list->clear();
140
141 MpiParallelMng* pm = m_mpi_parallel_mng;
142
143 MP::Mpi::MpiAdapter* mpi_adapter = pm->adapter();
144 const MPI_Datatype mpi_dt = MP::Mpi::MpiBuiltIn::datatype(Byte());
145
146 double prepare_time = 0.0;
147
148 {
149 MpiTimeInterval tit(&prepare_time);
150 constexpr int serialize_tag = 523;
151
152 // Send receive messages in non-blocking mode
153 m_original_recv_requests_done.resize(nb_message);
154 m_original_recv_requests.resize(nb_message);
155
156 // Post receive messages
157 for (Integer i = 0; i < nb_message; ++i) {
158 Int32 target_rank = ds_buf->targetRank(i);
159 auto buf = ds_buf->receiveBuffer(i).bytes();
160 if (!buf.empty()) {
161 auto req = mpi_adapter->receiveNonBlockingNoStat(buf.data(), buf.size(),
162 target_rank, mpi_dt, serialize_tag);
163 m_original_recv_requests[i] = req;
164 m_original_recv_requests_done[i] = false;
165 }
166 else {
167 // It is not necessary to send an empty message.
168 // Consider the message finished
169 m_original_recv_requests[i] = Parallel::Request{};
170 m_original_recv_requests_done[i] = true;
171 }
172 }
173
174 // Copy send buffers into \a var_values
175 ds_buf->copyAllSend();
176
177 // Post send messages in non-blocking mode.
178 for (Integer i = 0; i < nb_message; ++i) {
179 auto buf = ds_buf->sendBuffer(i).bytes();
180 Int32 target_rank = ds_buf->targetRank(i);
181 if (!buf.empty()) {
182 auto request = mpi_adapter->sendNonBlockingNoStat(buf.data(), buf.size(),
183 target_rank, mpi_dt, serialize_tag);
184 m_send_request_list->add(request);
185 }
186 }
187 }
188 pm->stat()->add("SyncPrepare", prepare_time, ds_buf->totalSendSize());
189}
190
191/*---------------------------------------------------------------------------*/
192/*---------------------------------------------------------------------------*/
193
194void MpiVariableSynchronizeDispatcher::
195endSynchronize(IDataSynchronizeBuffer* ds_buf)
196{
197 MpiParallelMng* pm = m_mpi_parallel_mng;
198
199 // We need to keep the original index in 'SyncBuffer'
200 // of each request to manage the copies.
201 UniqueArray<Integer> remaining_original_indexes;
202
203 double copy_time = 0.0;
204 double wait_time = 0.0;
205
206 while (1) {
207 // Create the list of still active requests.
208 m_receive_request_list->clear();
209 remaining_original_indexes.clear();
210 for (Integer i = 0, n = m_original_recv_requests_done.size(); i < n; ++i) {
211 if (!m_original_recv_requests_done[i]) {
212 m_receive_request_list->add(m_original_recv_requests[i]);
213 remaining_original_indexes.add(i);
214 }
215 }
216 Integer nb_remaining_request = m_receive_request_list->size();
217 if (nb_remaining_request == 0)
218 break;
219
220 {
221 MpiTimeInterval tit(&wait_time);
222 m_receive_request_list->wait(Parallel::WaitSome);
223 }
224
225 // For each completed request, perform the copy
226 ConstArrayView<Int32> done_requests = m_receive_request_list->doneRequestIndexes();
227
228 for (Int32 request_index : done_requests) {
229 Int32 orig_index = remaining_original_indexes[request_index];
230
231 // To indicate that it is finished
232 m_original_recv_requests_done[orig_index] = true;
233
234 // Copy the received values
235 {
236 MpiTimeInterval tit(&copy_time);
237 ds_buf->copyReceiveAsync(orig_index);
238 }
239 }
240 }
241
242 // Wait for sends to finish.
243 // This must be done to free the requests even if the message
244 // has arrived.
245 {
246 MpiTimeInterval tit(&wait_time);
247 m_send_request_list->wait(Parallel::WaitAll);
248 }
249
250 // Ensure that buffer copies are completed
251 ds_buf->barrier();
252
253 Int64 total_ghost_size = ds_buf->totalReceiveSize();
254 Int64 total_share_size = ds_buf->totalSendSize();
255 Int64 total_size = total_ghost_size + total_share_size;
256 pm->stat()->add("SyncCopy", copy_time, total_ghost_size);
257 pm->stat()->add("SyncWait", wait_time, total_size);
258}
259
260/*---------------------------------------------------------------------------*/
261/*---------------------------------------------------------------------------*/
262
263} // End namespace Arcane
264
265/*---------------------------------------------------------------------------*/
266/*---------------------------------------------------------------------------*/
void resize(Int64 s)
Changes the number of elements in the array to s.
Generic buffer for data synchronization.
Interface for a generic dispatcher factory.
Parallelism manager using MPI.
Reference to an instance.
1D data vector with value semantics (STL style).
@ WaitSome
Wait until all messages in the list are processed.
-- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature --
std::int64_t Int64
Signed integer type of 64 bits.
Int32 Integer
Type representing an integer.
unsigned char Byte
Type of a byte.
Definition BaseTypes.h:43
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Creates a reference on a pointer.
std::int32_t Int32
Signed integer type of 32 bits.