12#ifndef ARCANE_PARALLEL_THREAD_SHAREDMEMORYMESSAGEQUEUE_H
13#define ARCANE_PARALLEL_THREAD_SHAREDMEMORYMESSAGEQUEUE_H
17#include "arcane/utils/TraceAccessor.h"
18#include "arcane/utils/Array.h"
20#include "arcane/parallel/thread/ISharedMemoryMessageQueue.h"
22#include "arcane/ISerializer.h"
23#include "arcane/Parallel.h"
47class ARCANE_THREAD_EXPORT SharedMemoryMessageQueue
56 SharedMemoryMessageQueue() : m_nb_thread(0), m_atomic_request_id(0){}
57 ~SharedMemoryMessageQueue()
override;
61 void init(
Int32 nb_thread)
override;
64 bool is_non_blockign)
override;
79 Int32 m_nb_thread = 0;
81 std::atomic<Int64> m_atomic_request_id;
87 return m_sub_queues[rank.
value()];
89 Int64 _getNextRequestId()
91 return m_atomic_request_id.fetch_add(1);
94 SubQueue* _getSourceSubQueue(
const MP::PointToPointMessageInfo& message);
95 SubQueue* _getDestinationSubQueue(
const MP::PointToPointMessageInfo& message);
111 explicit SortFunctor(
Int32 nb_thread) : m_nb_thread(nb_thread){}
122 if (r1->isRecv() && !r2->isRecv())
124 if (!r1->isRecv() && r2->isRecv())
126 Int32 i1 = _getQueueIndex(r1->orig(),r1->dest());
127 Int32 i2 = _getQueueIndex(r2->orig(),r2->dest());
129 return r1->id() < r2->id();
139 return thread1.
value() + (thread2.
value() * m_nb_thread);
149 : m_queue(queue), m_request_id(request_id), m_is_recv(true)
150 , m_orig(orig), m_dest(dest), m_tag(tag), m_receive_buffer_info(buf)
156 : m_queue(queue), m_request_id(request_id), m_is_recv(false)
157 , m_orig(orig), m_dest(dest), m_tag(tag), m_send_buffer_info(buf)
163 MessageTag tag() {
return m_tag; }
164 bool isRecv() {
return m_is_recv; }
165 bool isDone() {
return m_is_done; }
166 void setDone(
bool v) { m_is_done = v; }
167 SendBufferInfo sendBufferInfo() {
return m_send_buffer_info; }
168 ReceiveBufferInfo receiveBufferInfo() {
return m_receive_buffer_info; }
169 SharedMemoryMessageQueue::SubQueue* queue() {
return m_queue; }
170 void copyFromSender(SharedMemoryMessageRequest* sender);
171 Int64 id()
const {
return m_request_id; }
173 ISerializer* recvSerializer() {
return m_receive_buffer_info.serializer(); }
174 const ISerializer* sendSerializer() {
return m_send_buffer_info.serializer(); }
176 void setSource(MessageRank s)
192 bool m_is_done =
false;
194 bool m_is_destroyed =
false;
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Vue modifiable d'un tableau d'un type T.
Interface du gestionnaire de traces.
Interface d'une file de messages avec les threads.
Int32 value() const
Valeur du rang.
bool isNull() const
Vrai si rang non initialisé correspondant au rang par défaut.
Informations pour envoyer/recevoir un message point à point.
Informations des buffers de réception.
Informations des buffers d'envoie.
File pour les messages d'un rang en mémoire partagée.
File de messages entre les rangs partagés par un SharedMemoryParallelMng.
Message entre SharedMemoryMessageQueue.
SharedMemoryMessageRequest(SubQueue *queue, Int64 request_id, MessageRank orig, MessageRank dest, MessageTag tag, ReceiveBufferInfo buf)
Créé une requête d'envoie.
SharedMemoryMessageRequest * matchingSendRequest()
Requête associée dans le cas où c'est un receive issu d'un probe
SharedMemoryMessageRequest(SubQueue *queue, Int64 request_id, MessageRank orig, MessageRank dest, MessageTag tag, SendBufferInfo buf)
Créé une requête de réception.
Gestionnaire du parallélisme utilisant les threads.
Vecteur 1D de données avec sémantique par valeur (style STL).
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
std::int64_t Int64
Type entier signé sur 64 bits.
std::int32_t Int32
Type entier signé sur 32 bits.