Arcane  4.1.12.0
Developer documentation
Loading...
Searching...
No Matches
HybridMessageQueue.h
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2026 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* HybridMessageQueue.h (C) 2000-2024 */
9/* */
10/* Message file for a hybrid MPI/Shared Memory implementation. */
11/*---------------------------------------------------------------------------*/
12#ifndef ARCANE_PARALLEL_THREAD_HYBRIDMESSAGEQUEUE_H
13#define ARCANE_PARALLEL_THREAD_HYBRIDMESSAGEQUEUE_H
14/*---------------------------------------------------------------------------*/
15/*---------------------------------------------------------------------------*/
16
17#include "arcane/utils/CheckedConvert.h"
18#include "arcane/utils/TraceAccessor.h"
19#include "arcane/parallel/thread/ISharedMemoryMessageQueue.h"
22#include "arcane/parallel/mpi/ArcaneMpi.h"
23
24/*---------------------------------------------------------------------------*/
25/*---------------------------------------------------------------------------*/
26
27namespace Arcane
28{
29class MpiParallelMng;
30}
31
33{
34
35/*---------------------------------------------------------------------------*/
36/*---------------------------------------------------------------------------*/
42{
43 public:
44
45 static FullRankInfo compute(MP::MessageRank rank, Int32 local_nb_rank)
46 {
47 Int32 r = rank.value();
48 FullRankInfo fri;
49 fri.m_local_rank.setValue(r % local_nb_rank);
50 fri.m_global_rank.setValue(r);
51 fri.m_mpi_rank.setValue(r / local_nb_rank);
52 return fri;
53 }
54 friend std::ostream& operator<<(std::ostream& o, const FullRankInfo& fri);
55
56 public:
57
59 MP::MessageRank localRank() const { return m_local_rank; }
60 Int32 localRankValue() const { return m_local_rank.value(); }
62 MP::MessageRank globalRank() const { return m_global_rank; }
63 Int32 globalRankValue() const { return m_global_rank.value(); }
65 MP::MessageRank mpiRank() const { return m_mpi_rank; }
66 Int32 mpiRankValue() const { return m_mpi_rank.value(); }
67
68 private:
69
71 MP::MessageRank m_local_rank;
73 MP::MessageRank m_global_rank;
75 MP::MessageRank m_mpi_rank;
76};
77
78/*---------------------------------------------------------------------------*/
79/*---------------------------------------------------------------------------*/
83class SourceDestinationFullRankInfo
84{
85 public:
86
87 SourceDestinationFullRankInfo(FullRankInfo s, FullRankInfo d)
88 : m_source(s)
89 , m_destination(d)
90 {}
91
92 public:
93
94 FullRankInfo source() const { return m_source; }
95 FullRankInfo destination() const { return m_destination; }
96 bool isSameMpiRank() const
97 {
98 return m_source.mpiRank() == m_destination.mpiRank();
99 }
100
101 private:
102
103 FullRankInfo m_source;
104 FullRankInfo m_destination;
105};
106
107/*---------------------------------------------------------------------------*/
108/*---------------------------------------------------------------------------*/
123{
124 public:
125
127 // NOTE: theoretically, this value can be calculated dynamically by taking
128 // into account the max valid value for MPI (via MPI_Comm_get_attribute(MPI_TAG_UB))
129 // and the maximum number of local threads. However, this would make
130 // the code too dependent on the implementation.
131 // Note that in the MPI standard, it is possible to have only
132 // 2^15 (32767) values for tags.
133 static constexpr Int32 MAX_USER_TAG_BIT = 14;
134 static constexpr Int32 MAX_USER_TAG = 1 << MAX_USER_TAG_BIT;
135
136 public:
137
140 : m_nb_rank(nb_rank)
141 {}
142 Int32 nbLocalRank() const { return m_nb_rank; }
143 FullRankInfo rank(MessageRank user_rank) const
144 {
145 return FullRankInfo::compute(user_rank, m_nb_rank);
146 }
147 SourceDestinationFullRankInfo rank(MessageRank rank1, MessageRank rank2) const
148 {
149 auto x1 = FullRankInfo::compute(rank1, m_nb_rank);
150 auto x2 = FullRankInfo::compute(rank2, m_nb_rank);
151 return SourceDestinationFullRankInfo(x1, x2);
152 }
153 MessageTag tagForSend(MessageTag user_tag, FullRankInfo orig, FullRankInfo dest) const
154 {
155 return _tag(user_tag, dest.localRank(), orig.localRank());
156 }
157 MessageTag tagForSend(MessageTag user_tag, SourceDestinationFullRankInfo fri) const
158 {
159 return tagForSend(user_tag, fri.source(), fri.destination());
160 }
161 MessageTag tagForReceive(MessageTag user_tag, FullRankInfo orig, FullRankInfo dest) const
162 {
163 return _tag(user_tag, orig.localRank(), dest.localRank());
164 }
165 MessageTag tagForReceive(MessageTag user_tag, MessageRank orig_local, MessageRank dest_local) const
166 {
167 return _tag(user_tag, orig_local, dest_local);
168 }
169 MessageTag tagForReceive(MessageTag user_tag, SourceDestinationFullRankInfo fri) const
170 {
171 return tagForReceive(user_tag, fri.source(), fri.destination());
172 }
175 {
176 Int32 t = internal_tag.value() >> MAX_USER_TAG_BIT;
177 return t % m_nb_rank;
178 }
179
180 private:
181
182 MessageTag _tag(MessageTag user_tag, MessageRank orig_local, MessageRank dest_local) const
183 {
184 Int64 utag = user_tag.value();
185 if (utag > MAX_USER_TAG)
186 ARCANE_FATAL("User tag is too big v={0} max={1}", utag, MAX_USER_TAG);
187 Int32 d = dest_local.value();
188 Int32 o = orig_local.value();
189 Int64 new_tag = (o * m_nb_rank + d) << MAX_USER_TAG_BIT;
190 new_tag += utag;
191 return MessageTag(CheckedConvert::toInt32(new_tag));
192 }
193
194 private:
195
196 Int32 m_nb_rank;
197};
198
199/*---------------------------------------------------------------------------*/
200/*---------------------------------------------------------------------------*/
201
206class HybridMessageQueue
207: public TraceAccessor
208{
209 public:
210
211 HybridMessageQueue(ISharedMemoryMessageQueue* thread_queue, MpiParallelMng* mpi_pm,
212 Int32 local_nb_rank);
213
214 public:
215
216 void waitAll(ArrayView<Request> requests);
217 void waitSome(Int32 rank, ArrayView<Request> requests,
218 ArrayView<bool> requests_done, bool is_non_blocking);
219
220 public:
221
222 Request addReceive(const PointToPointMessageInfo& message, ReceiveBufferInfo buf);
223 Request addSend(const PointToPointMessageInfo& message, SendBufferInfo buf);
224 MessageId probe(const MP::PointToPointMessageInfo& message);
225 MP::MessageSourceInfo legacyProbe(const MP::PointToPointMessageInfo& message);
226 const RankTagBuilder& rankTagBuilder() const { return m_rank_tag_builder; }
227
228 private:
229
230 ISharedMemoryMessageQueue* m_thread_queue;
231 MpiParallelMng* m_mpi_parallel_mng;
232 MpiAdapter* m_mpi_adapter;
233 Int32 m_local_nb_rank;
234 RankTagBuilder m_rank_tag_builder;
235 Int32 m_debug_level = 0;
236 bool m_is_allow_null_rank_for_any_source = true;
237
238 private:
239
240 Request _addReceiveRankTag(const PointToPointMessageInfo& message, ReceiveBufferInfo buf_info);
241 Request _addReceiveMessageId(const PointToPointMessageInfo& message, ReceiveBufferInfo buf_info);
242 void _checkValidRank(MessageRank rank);
243 void _checkValidSource(const PointToPointMessageInfo& message);
244 SourceDestinationFullRankInfo _getFullRankInfo(const PointToPointMessageInfo& message)
245 {
246 return m_rank_tag_builder.rank(message.emiterRank(), message.destinationRank());
247 }
249 _buildSharedMemoryMessage(const PointToPointMessageInfo& message,
252 _buildMPIMessage(const PointToPointMessageInfo& message,
254 Integer _testOrWaitSome(Int32 rank, ArrayView<Request> requests,
255 ArrayView<bool> requests_done);
256 MessageId _probe(const MP::PointToPointMessageInfo& message, bool use_message_id);
257};
258
259/*---------------------------------------------------------------------------*/
260/*---------------------------------------------------------------------------*/
261
262} // End namespace Arcane::MessagePassing
263
264/*---------------------------------------------------------------------------*/
265/*---------------------------------------------------------------------------*/
266
267#endif
#define ARCANE_FATAL(...)
Macro throwing a FatalErrorException.
Declarations of Arcane's general types.
File containing declarations concerning the message passing model.
Modifiable view of an array of type T.
Correspondence information between the different ranks of a communicator.
MP::MessageRank m_global_rank
Global rank within the communicator.
MP::MessageRank globalRank() const
Global rank within the communicator.
MP::MessageRank mpiRank() const
Associated MPI rank.
MP::MessageRank localRank() const
Local rank within the threads.
MP::MessageRank m_mpi_rank
Associated MPI rank.
MP::MessageRank m_local_rank
Local rank within the threads.
Int32 value() const
Rank value.
Definition MessageRank.h:76
Information for sending/receiving a point-to-point message.
MessageRank emiterRank() const
Message sender rank.
MessageRank destinationRank() const
Message destination rank.
Class to calculate a tag containing sender and receiver information from a user tag.
static constexpr Int32 MAX_USER_TAG_BIT
We allow 2^14 tags, i.e., 16384.
Int32 getReceiveRankFromTag(MessageTag internal_tag) const
Retrieves the rank from the tag. This is the inverse operation of _tag().
RankTagBuilder(Int32 nb_rank)
Constructs an instance for local_nb_rank.
Encapsulates source/destination information.
Parallelism manager using MPI.
TraceAccessor(ITraceMng *m)
Constructs an accessor via the trace manager m.
Declarations of types and methods used by message exchange mechanisms.
-- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature --
std::int64_t Int64
Signed integer type of 64 bits.
Int32 Integer
Type representing an integer.
std::int32_t Int32
Signed integer type of 32 bits.