Arcane  4.1.12.0
Developer documentation
Loading...
Searching...
No Matches
MpiBlockVariableSynchronizeDispatcher.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2026 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* MpiBlockVariableSynchronizeDispatcher.cc (C) 2000-2025 */
9/* */
10/* MPI-specific variable synchronization management. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/utils/FatalErrorException.h"
15#include "arcane/utils/MemoryView.h"
16
17#include "arcane/parallel/mpi/MpiParallelMng.h"
18#include "arcane/parallel/mpi/MpiTimeInterval.h"
19#include "arcane/parallel/IStat.h"
20
21#include "arcane/impl/IDataSynchronizeBuffer.h"
22#include "arcane/impl/IDataSynchronizeImplementation.h"
23
24#include "arccore/message_passing/IRequestList.h"
25#include "arccore/message_passing_mpi/internal/MpiAdapter.h"
26
27/*---------------------------------------------------------------------------*/
28/*---------------------------------------------------------------------------*/
29/*
30 * This implementation divides the synchronization into fixed-size blocks.
31 * The entire mechanism is in _endSynchronize().
32 * The algorithm is as follows:
33 *
34 * 1. Copy the values to be sent into the send buffers.
35 * 2. Loop over Irecv/ISend/WaitAll as long as there is at least one non-empty part.
36 * 3. Copy the variable values from the receive buffers.
37*/
38/*---------------------------------------------------------------------------*/
39/*---------------------------------------------------------------------------*/
40
41namespace Arcane
42{
43
44/*---------------------------------------------------------------------------*/
45/*---------------------------------------------------------------------------*/
54class MpiBlockVariableSynchronizerDispatcher
56{
57 public:
58
59 class Factory;
60 explicit MpiBlockVariableSynchronizerDispatcher(Factory* f);
61
62 public:
63
64 void compute() override {}
65 void beginSynchronize(IDataSynchronizeBuffer* buf) override;
66 void endSynchronize(IDataSynchronizeBuffer* buf) override;
67
68 private:
69
70 MpiParallelMng* m_mpi_parallel_mng = nullptr;
71 Ref<Parallel::IRequestList> m_request_list;
72 Int32 m_block_size;
73 Int32 m_nb_sequence;
74
75 private:
76
77 bool _isSkipRank(Int32 rank, Int32 sequence) const;
78};
79
80/*---------------------------------------------------------------------------*/
81/*---------------------------------------------------------------------------*/
82
85{
86 public:
87
88 Factory(MpiParallelMng* mpi_pm, Int32 block_size, Int32 nb_sequence)
89 : m_mpi_parallel_mng(mpi_pm)
90 , m_block_size(block_size)
91 , m_nb_sequence(nb_sequence)
92 {}
93
94 Ref<IDataSynchronizeImplementation> createInstance() override
95 {
96 auto* x = new MpiBlockVariableSynchronizerDispatcher(this);
98 }
99
100 public:
101
102 MpiParallelMng* m_mpi_parallel_mng = nullptr;
103 Int32 m_block_size = 0;
104 Int32 m_nb_sequence = 0;
105};
106
107/*---------------------------------------------------------------------------*/
108/*---------------------------------------------------------------------------*/
109
111arcaneCreateMpiBlockVariableSynchronizerFactory(MpiParallelMng* mpi_pm, Int32 block_size, Int32 nb_sequence)
112{
113 auto* x = new MpiBlockVariableSynchronizerDispatcher::Factory(mpi_pm, block_size, nb_sequence);
115}
116
117/*---------------------------------------------------------------------------*/
118/*---------------------------------------------------------------------------*/
119
120MpiBlockVariableSynchronizerDispatcher::
121MpiBlockVariableSynchronizerDispatcher(Factory* f)
122: m_mpi_parallel_mng(f->m_mpi_parallel_mng)
123, m_request_list(m_mpi_parallel_mng->createRequestListRef())
124, m_block_size(f->m_block_size)
125, m_nb_sequence(f->m_nb_sequence)
126{
127}
128
129/*---------------------------------------------------------------------------*/
130/*---------------------------------------------------------------------------*/
131
132bool MpiBlockVariableSynchronizerDispatcher::
133_isSkipRank(Int32 rank, Int32 sequence) const
134{
135 if (m_nb_sequence == 1)
136 return false;
137 return (rank % m_nb_sequence) == sequence;
138}
139
140/*---------------------------------------------------------------------------*/
141/*---------------------------------------------------------------------------*/
142
143void MpiBlockVariableSynchronizerDispatcher::
144beginSynchronize(IDataSynchronizeBuffer* vs_buf)
145{
146 // Does nothing at the MPI level in this part because this implementation
147 // does not support asynchronous operations.
148 // We only copy the variable values into the send buffer to allow the
149 // variable values to be modified between _beginSynchronize() and
150 // _endSynchronize().
151
152 double send_copy_time = 0.0;
153 {
154 MpiTimeInterval tit(&send_copy_time);
155 // Copy send buffers
156 vs_buf->copyAllSend();
157 }
158 Int64 total_share_size = vs_buf->totalSendSize();
159 m_mpi_parallel_mng->stat()->add("SyncSendCopy", send_copy_time, total_share_size);
160}
161
162/*---------------------------------------------------------------------------*/
163/*---------------------------------------------------------------------------*/
164
165void MpiBlockVariableSynchronizerDispatcher::
166endSynchronize(IDataSynchronizeBuffer* vs_buf)
167{
168 const Int32 nb_message = vs_buf->nbRank();
169
170 MpiParallelMng* pm = m_mpi_parallel_mng;
171 Int32 my_rank = pm->commRank();
172
173 MP::Mpi::MpiAdapter* mpi_adapter = pm->adapter();
174 const MPI_Datatype mpi_dt = MP::Mpi::MpiBuiltIn::datatype(Byte());
175
176 double prepare_time = 0.0;
177 double copy_time = 0.0;
178 double wait_time = 0.0;
179
180 constexpr int serialize_tag = 523;
181
182 const Int32 block_size = m_block_size;
183
184 for (Int32 isequence = 0; isequence < m_nb_sequence; ++isequence) {
185 Int32 block_index = 0;
186 while (1) {
187 {
188 MpiTimeInterval tit(&prepare_time);
189 m_request_list->clear();
190
191 // Post receive messages
192 for (Integer i = 0; i < nb_message; ++i) {
193 Int32 target_rank = vs_buf->targetRank(i);
194 if (_isSkipRank(target_rank, isequence))
195 continue;
196 auto buf0 = vs_buf->receiveBuffer(i).bytes();
197 auto buf = buf0.subSpan(block_index, block_size);
198 if (!buf.empty()) {
199 auto req = mpi_adapter->receiveNonBlockingNoStat(buf.data(), buf.size(),
200 target_rank, mpi_dt, serialize_tag);
201 m_request_list->add(req);
202 }
203 }
204
205 // Post send messages in non-blocking mode.
206 for (Integer i = 0; i < nb_message; ++i) {
207 Int32 target_rank = vs_buf->targetRank(i);
208 if (_isSkipRank(my_rank, isequence))
209 continue;
210 auto buf0 = vs_buf->sendBuffer(i).bytes();
211 auto buf = buf0.subSpan(block_index, block_size);
212 if (!buf.empty()) {
213 auto request = mpi_adapter->sendNonBlockingNoStat(buf.data(), buf.size(),
214 target_rank, mpi_dt, serialize_tag);
215 m_request_list->add(request);
216 }
217 }
218 }
219
220 // If no requests, we are done with our synchronization
221 if (m_request_list->size() == 0)
222 break;
223
224 // Wait for messages to finish
225 {
226 MpiTimeInterval tit(&wait_time);
227 m_request_list->wait(Parallel::WaitAll);
228 }
229
230 block_index += block_size;
231 }
232 }
233
234 // Copy received values
235 {
236 MpiTimeInterval tit(&copy_time);
237 vs_buf->copyAllReceive();
238 }
239
240 Int64 total_ghost_size = vs_buf->totalReceiveSize();
241 Int64 total_share_size = vs_buf->totalSendSize();
242 Int64 total_size = total_ghost_size + total_share_size;
243 pm->stat()->add("SyncCopy", copy_time, total_ghost_size);
244 pm->stat()->add("SyncWait", wait_time, total_size);
245 pm->stat()->add("SyncPrepare", prepare_time, total_share_size);
246}
247
248/*---------------------------------------------------------------------------*/
249/*---------------------------------------------------------------------------*/
250
251} // End namespace Arcane
252
253/*---------------------------------------------------------------------------*/
254/*---------------------------------------------------------------------------*/
Generic buffer for data synchronization.
Interface for a generic dispatcher factory.
Parallelism manager using MPI.
Reference to an instance.
-- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature --
std::int64_t Int64
Signed integer type of 64 bits.
Int32 Integer
Type representing an integer.
unsigned char Byte
Type of a byte.
Definition BaseTypes.h:43
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Creates a reference on a pointer.
std::int32_t Int32
Signed integer type of 32 bits.