Arcane  4.1.12.0
Developer documentation
Loading...
Searching...
No Matches
SharedMemoryMessageQueue.h
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2026 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* SharedMemoryMessageQueue.h (C) 2000-2024 */
9/* */
10/* Implementation of a message queue in shared memory. */
11/*---------------------------------------------------------------------------*/
12#ifndef ARCANE_PARALLEL_THREAD_SHAREDMEMORYMESSAGEQUEUE_H
13#define ARCANE_PARALLEL_THREAD_SHAREDMEMORYMESSAGEQUEUE_H
14/*---------------------------------------------------------------------------*/
15/*---------------------------------------------------------------------------*/
16
17#include "arcane/utils/TraceAccessor.h"
18#include "arcane/utils/Array.h"
19
20#include "arcane/parallel/thread/ISharedMemoryMessageQueue.h"
21
22#include "arcane/core/ISerializer.h"
24
25#include <algorithm>
26
27/*---------------------------------------------------------------------------*/
28/*---------------------------------------------------------------------------*/
29
31{
32
33/*---------------------------------------------------------------------------*/
34/*---------------------------------------------------------------------------*/
35
41
42/*---------------------------------------------------------------------------*/
43/*---------------------------------------------------------------------------*/
44
48class ARCANE_THREAD_EXPORT SharedMemoryMessageQueue
50{
51 public:
52
53 class SubQueue;
54
55 public:
56
57 SharedMemoryMessageQueue()
58 : m_nb_thread(0)
59 , m_atomic_request_id(0)
60 {}
61 ~SharedMemoryMessageQueue() override;
62
63 public:
64
65 void init(Int32 nb_thread) override;
66 void waitAll(ArrayView<Request> requests) override;
67 void waitSome(Int32 rank, ArrayView<Request> requests, ArrayView<bool> requests_done,
68 bool is_non_blockign) override;
69 void setTraceMng(Int32 rank, ITraceMng* tm) override;
70
71 public:
72
73 Request addReceive(const PointToPointMessageInfo& message, ReceiveBufferInfo buf) override;
74 Request addSend(const PointToPointMessageInfo& message, SendBufferInfo buf) override;
75
76 public:
77
78 MessageId probe(const PointToPointMessageInfo& message) override;
79 MessageSourceInfo legacyProbe(const PointToPointMessageInfo& message) override;
80
81 private:
82
83 Int32 m_nb_thread = 0;
84 UniqueArray<SubQueue*> m_sub_queues;
85 std::atomic<Int64> m_atomic_request_id;
86
87 private:
88
89 SubQueue* _getSubQueue(MessageRank rank)
90 {
91 return m_sub_queues[rank.value()];
92 }
93 Int64 _getNextRequestId()
94 {
95 return m_atomic_request_id.fetch_add(1);
96 }
98 SubQueue* _getSourceSubQueue(const MP::PointToPointMessageInfo& message);
99 SubQueue* _getDestinationSubQueue(const MP::PointToPointMessageInfo& message);
100};
101
102/*---------------------------------------------------------------------------*/
103/*---------------------------------------------------------------------------*/
104
110class ARCANE_THREAD_EXPORT SharedMemoryMessageRequest
111{
112 public:
113
114 class SortFunctor
115 {
116 public:
117
118 explicit SortFunctor(Int32 nb_thread)
119 : m_nb_thread(nb_thread)
120 {}
121 bool operator()(SharedMemoryMessageRequest* r1, SharedMemoryMessageRequest* r2) const
122 {
123 if (!r1) {
124 if (!r2)
125 return true;
126 else
127 return false;
128 }
129 if (!r2)
130 return true;
131 if (r1->isRecv() && !r2->isRecv())
132 return true;
133 if (!r1->isRecv() && r2->isRecv())
134 return false;
135 Int32 i1 = _getQueueIndex(r1->orig(), r1->dest());
136 Int32 i2 = _getQueueIndex(r2->orig(), r2->dest());
137 if (i1 == i2)
138 return r1->id() < r2->id();
139 return i1 < i2;
140 }
141 Int32 _getQueueIndex(MessageRank thread1, MessageRank thread2) const
142 {
143 if (thread1.isNull())
144 ARCANE_FATAL("Null rank for thread1");
145 if (thread2.isNull())
146 ARCANE_FATAL("Null rank for thread2");
147 // TODO: handle dest()==A_NULL_RANK.
148 return thread1.value() + (thread2.value() * m_nb_thread);
149 }
150 Int32 m_nb_thread;
151 };
152
153 public:
154
155 using SubQueue = SharedMemoryMessageQueue::SubQueue;
156
157 public:
158
160 SharedMemoryMessageRequest(SubQueue* queue, Int64 request_id, MessageRank orig,
162 : m_queue(queue)
163 , m_request_id(request_id)
164 , m_is_recv(true)
165 , m_orig(orig)
166 , m_dest(dest)
167 , m_tag(tag)
168 , m_receive_buffer_info(buf)
169 {
170 }
171
172 SharedMemoryMessageRequest(SubQueue* queue, Int64 request_id, MessageRank orig,
174 : m_queue(queue)
175 , m_request_id(request_id)
176 , m_is_recv(false)
177 , m_orig(orig)
178 , m_dest(dest)
179 , m_tag(tag)
180 , m_send_buffer_info(buf)
181 {
182 }
183
184 public:
185
186 MessageRank orig() { return m_orig; }
187 MessageRank dest() { return m_dest; }
188 MessageTag tag() { return m_tag; }
189 bool isRecv() { return m_is_recv; }
190 bool isDone() { return m_is_done; }
191 void setDone(bool v) { m_is_done = v; }
192 SendBufferInfo sendBufferInfo() { return m_send_buffer_info; }
193 ReceiveBufferInfo receiveBufferInfo() { return m_receive_buffer_info; }
194 SharedMemoryMessageQueue::SubQueue* queue() { return m_queue; }
195 void copyFromSender(SharedMemoryMessageRequest* sender);
196 Int64 id() const { return m_request_id; }
197 void destroy();
198 ISerializer* recvSerializer() { return m_receive_buffer_info.serializer(); }
199 const ISerializer* sendSerializer() { return m_send_buffer_info.serializer(); }
200 // In the case where dest()==A_NULL_RANK, sets the original rank once the message is received.
201 void setSource(MessageRank s)
202 {
203 if (isRecv())
204 m_dest = s;
205 }
207 SharedMemoryMessageRequest* matchingSendRequest() { return m_matching_send_request; }
208 void setMatchingSendRequest(SharedMemoryMessageRequest* r) { m_matching_send_request = r; }
209
210 private:
211
212 SubQueue* m_queue;
213 Int64 m_request_id;
214 bool m_is_recv;
215 MessageRank m_orig;
216 MessageRank m_dest;
217 MessageTag m_tag;
218 bool m_is_done = false;
219 SharedMemoryMessageRequest* m_matching_send_request = nullptr;
220 bool m_is_destroyed = false;
221 SendBufferInfo m_send_buffer_info;
222 ReceiveBufferInfo m_receive_buffer_info;
223};
224
225/*---------------------------------------------------------------------------*/
226/*---------------------------------------------------------------------------*/
227
228} // End namespace Arcane::MessagePassing
229
230/*---------------------------------------------------------------------------*/
231/*---------------------------------------------------------------------------*/
232
233#endif
#define ARCANE_FATAL(...)
Macro throwing a FatalErrorException.
File containing declarations concerning the message passing model.
Modifiable view of an array of type T.
Int32 value() const
Rank value.
Definition MessageRank.h:76
bool isNull() const
True if the rank is uninitialized, corresponding to the default rank.
Definition MessageRank.h:82
Information for sending/receiving a point-to-point message.
Message queue between ranks shared by a SharedMemoryParallelMng.
SharedMemoryMessageRequest(SubQueue *queue, Int64 request_id, MessageRank orig, MessageRank dest, MessageTag tag, ReceiveBufferInfo buf)
Create a send request.
SharedMemoryMessageRequest * matchingSendRequest()
Associated request in the case it is a receive resulting from a probe.
SharedMemoryMessageRequest(SubQueue *queue, Int64 request_id, MessageRank orig, MessageRank dest, MessageTag tag, SendBufferInfo buf)
Create a receive request.
1D data vector with value semantics (STL style).
Declarations of types and methods used by message exchange mechanisms.
std::int64_t Int64
Signed integer type of 64 bits.
std::int32_t Int32
Signed integer type of 32 bits.