12#ifndef ARCANE_PARALLEL_THREAD_SHAREDMEMORYMESSAGEQUEUE_H
13#define ARCANE_PARALLEL_THREAD_SHAREDMEMORYMESSAGEQUEUE_H
17#include "arcane/utils/TraceAccessor.h"
18#include "arcane/utils/Array.h"
20#include "arcane/parallel/thread/ISharedMemoryMessageQueue.h"
22#include "arcane/core/ISerializer.h"
48class ARCANE_THREAD_EXPORT SharedMemoryMessageQueue
57 SharedMemoryMessageQueue()
59 , m_atomic_request_id(0)
61 ~SharedMemoryMessageQueue()
override;
65 void init(
Int32 nb_thread)
override;
68 bool is_non_blockign)
override;
83 Int32 m_nb_thread = 0;
85 std::atomic<Int64> m_atomic_request_id;
91 return m_sub_queues[rank.
value()];
93 Int64 _getNextRequestId()
95 return m_atomic_request_id.fetch_add(1);
98 SubQueue* _getSourceSubQueue(
const MP::PointToPointMessageInfo& message);
99 SubQueue* _getDestinationSubQueue(
const MP::PointToPointMessageInfo& message);
118 explicit SortFunctor(
Int32 nb_thread)
119 : m_nb_thread(nb_thread)
131 if (r1->isRecv() && !r2->isRecv())
133 if (!r1->isRecv() && r2->isRecv())
135 Int32 i1 = _getQueueIndex(r1->orig(), r1->dest());
136 Int32 i2 = _getQueueIndex(r2->orig(), r2->dest());
138 return r1->id() < r2->id();
148 return thread1.
value() + (thread2.
value() * m_nb_thread);
163 , m_request_id(request_id)
168 , m_receive_buffer_info(buf)
175 , m_request_id(request_id)
180 , m_send_buffer_info(buf)
188 MessageTag tag() {
return m_tag; }
189 bool isRecv() {
return m_is_recv; }
190 bool isDone() {
return m_is_done; }
191 void setDone(
bool v) { m_is_done = v; }
192 SendBufferInfo sendBufferInfo() {
return m_send_buffer_info; }
193 ReceiveBufferInfo receiveBufferInfo() {
return m_receive_buffer_info; }
194 SharedMemoryMessageQueue::SubQueue* queue() {
return m_queue; }
195 void copyFromSender(SharedMemoryMessageRequest* sender);
196 Int64 id()
const {
return m_request_id; }
198 ISerializer* recvSerializer() {
return m_receive_buffer_info.serializer(); }
199 const ISerializer* sendSerializer() {
return m_send_buffer_info.serializer(); }
201 void setSource(MessageRank s)
218 bool m_is_done =
false;
220 bool m_is_destroyed =
false;
#define ARCANE_FATAL(...)
Macro throwing a FatalErrorException.
File containing declarations concerning the message passing model.
Modifiable view of an array of type T.
Interface of a message queue with threads.
Int32 value() const
Rank value.
bool isNull() const
True if the rank is uninitialized, corresponding to the default rank.
Information for sending/receiving a point-to-point message.
Receive buffer information.
File for messages from a rank in shared memory.
Message queue between ranks shared by a SharedMemoryParallelMng.
Message within SharedMemoryMessageQueue.
SharedMemoryMessageRequest(SubQueue *queue, Int64 request_id, MessageRank orig, MessageRank dest, MessageTag tag, ReceiveBufferInfo buf)
Create a send request.
SharedMemoryMessageRequest * matchingSendRequest()
Associated request in the case it is a receive resulting from a probe.
SharedMemoryMessageRequest(SubQueue *queue, Int64 request_id, MessageRank orig, MessageRank dest, MessageTag tag, SendBufferInfo buf)
Create a receive request.
Thread-based parallelism manager.
1D data vector with value semantics (STL style).
Declarations of types and methods used by message exchange mechanisms.
std::int64_t Int64
Signed integer type of 64 bits.
std::int32_t Int32
Signed integer type of 32 bits.