Arcane  v3.16.0.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
SharedMemoryMessageQueue.h
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2024 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* SharedMemoryMessageQueue.h (C) 2000-2024 */
9/* */
10/* Implémentation d'une file de messages en mémoire partagée. */
11/*---------------------------------------------------------------------------*/
12#ifndef ARCANE_PARALLEL_THREAD_SHAREDMEMORYMESSAGEQUEUE_H
13#define ARCANE_PARALLEL_THREAD_SHAREDMEMORYMESSAGEQUEUE_H
14/*---------------------------------------------------------------------------*/
15/*---------------------------------------------------------------------------*/
16
17#include "arcane/utils/TraceAccessor.h"
18#include "arcane/utils/Array.h"
19
20#include "arcane/parallel/thread/ISharedMemoryMessageQueue.h"
21
22#include "arcane/ISerializer.h"
23#include "arcane/Parallel.h"
24
25#include <algorithm>
26
27/*---------------------------------------------------------------------------*/
28/*---------------------------------------------------------------------------*/
29
31{
32
33/*---------------------------------------------------------------------------*/
34/*---------------------------------------------------------------------------*/
35
41
42/*---------------------------------------------------------------------------*/
43/*---------------------------------------------------------------------------*/
47class ARCANE_THREAD_EXPORT SharedMemoryMessageQueue
49{
50 public:
51
52 class SubQueue;
53
54 public:
55
56 SharedMemoryMessageQueue() : m_nb_thread(0), m_atomic_request_id(0){}
57 ~SharedMemoryMessageQueue() override;
58
59 public:
60
61 void init(Int32 nb_thread) override;
62 void waitAll(ArrayView<Request> requests) override;
63 void waitSome(Int32 rank,ArrayView<Request> requests,ArrayView<bool> requests_done,
64 bool is_non_blockign) override;
65 void setTraceMng(Int32 rank,ITraceMng* tm) override;
66
67 public:
68
69 Request addReceive(const PointToPointMessageInfo& message,ReceiveBufferInfo buf) override;
70 Request addSend(const PointToPointMessageInfo& message,SendBufferInfo buf) override;
71
72 public:
73
74 MessageId probe(const PointToPointMessageInfo& message) override;
75 MessageSourceInfo legacyProbe(const PointToPointMessageInfo& message) override;
76
77 private:
78
79 Int32 m_nb_thread = 0;
80 UniqueArray<SubQueue*> m_sub_queues;
81 std::atomic<Int64> m_atomic_request_id;
82
83 private:
84
85 SubQueue* _getSubQueue(MessageRank rank)
86 {
87 return m_sub_queues[rank.value()];
88 }
89 Int64 _getNextRequestId()
90 {
91 return m_atomic_request_id.fetch_add(1);
92 }
94 SubQueue* _getSourceSubQueue(const MP::PointToPointMessageInfo& message);
95 SubQueue* _getDestinationSubQueue(const MP::PointToPointMessageInfo& message);
96};
97
98/*---------------------------------------------------------------------------*/
99/*---------------------------------------------------------------------------*/
105class ARCANE_THREAD_EXPORT SharedMemoryMessageRequest
106{
107 public:
108 class SortFunctor
109 {
110 public:
111 explicit SortFunctor(Int32 nb_thread) : m_nb_thread(nb_thread){}
112 bool operator()(SharedMemoryMessageRequest* r1,SharedMemoryMessageRequest* r2) const
113 {
114 if (!r1){
115 if (!r2)
116 return true;
117 else
118 return false;
119 }
120 if (!r2)
121 return true;
122 if (r1->isRecv() && !r2->isRecv())
123 return true;
124 if (!r1->isRecv() && r2->isRecv())
125 return false;
126 Int32 i1 = _getQueueIndex(r1->orig(),r1->dest());
127 Int32 i2 = _getQueueIndex(r2->orig(),r2->dest());
128 if (i1==i2)
129 return r1->id() < r2->id();
130 return i1 < i2;
131 }
132 Int32 _getQueueIndex(MessageRank thread1,MessageRank thread2) const
133 {
134 if (thread1.isNull())
135 ARCANE_FATAL("Null rank for thread1");
136 if (thread2.isNull())
137 ARCANE_FATAL("Null rank for thread2");
138 // TODO: gérer dest()==A_NULL_RANK.
139 return thread1.value() + (thread2.value() * m_nb_thread);
140 }
141 Int32 m_nb_thread;
142 };
143 public:
144 using SubQueue = SharedMemoryMessageQueue::SubQueue;
145 public:
147 SharedMemoryMessageRequest(SubQueue* queue,Int64 request_id,MessageRank orig,
149 : m_queue(queue), m_request_id(request_id), m_is_recv(true)
150 , m_orig(orig), m_dest(dest), m_tag(tag), m_receive_buffer_info(buf)
151 {
152 }
153
154 SharedMemoryMessageRequest(SubQueue* queue,Int64 request_id,MessageRank orig,
156 : m_queue(queue), m_request_id(request_id), m_is_recv(false)
157 , m_orig(orig), m_dest(dest), m_tag(tag), m_send_buffer_info(buf)
158 {
159 }
160 public:
161 MessageRank orig() { return m_orig; }
162 MessageRank dest() { return m_dest; }
163 MessageTag tag() { return m_tag; }
164 bool isRecv() { return m_is_recv; }
165 bool isDone() { return m_is_done; }
166 void setDone(bool v) { m_is_done = v; }
167 SendBufferInfo sendBufferInfo() { return m_send_buffer_info; }
168 ReceiveBufferInfo receiveBufferInfo() { return m_receive_buffer_info; }
169 SharedMemoryMessageQueue::SubQueue* queue() { return m_queue; }
170 void copyFromSender(SharedMemoryMessageRequest* sender);
171 Int64 id() const { return m_request_id; }
172 void destroy();
173 ISerializer* recvSerializer() { return m_receive_buffer_info.serializer(); }
174 const ISerializer* sendSerializer() { return m_send_buffer_info.serializer(); }
175 // Dans le cas ou dest()==A_NULL_RANK, positionne une fois le message recu le rang d'origine.
176 void setSource(MessageRank s)
177 {
178 if (isRecv())
179 m_dest = s;
180 }
182 SharedMemoryMessageRequest* matchingSendRequest() { return m_matching_send_request; }
183 void setMatchingSendRequest(SharedMemoryMessageRequest* r) { m_matching_send_request = r; }
184
185 private:
186 SubQueue* m_queue;
187 Int64 m_request_id;
188 bool m_is_recv;
189 MessageRank m_orig;
190 MessageRank m_dest;
191 MessageTag m_tag;
192 bool m_is_done = false;
193 SharedMemoryMessageRequest* m_matching_send_request = nullptr;
194 bool m_is_destroyed = false;
195 SendBufferInfo m_send_buffer_info;
196 ReceiveBufferInfo m_receive_buffer_info;
197};
198
199/*---------------------------------------------------------------------------*/
200/*---------------------------------------------------------------------------*/
201
202} // End namespace Arcane::MessagePassing
203
204/*---------------------------------------------------------------------------*/
205/*---------------------------------------------------------------------------*/
206
207#endif
208
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Vue modifiable d'un tableau d'un type T.
Interface du gestionnaire de traces.
Interface d'une file de messages avec les threads.
Int32 value() const
Valeur du rang.
Definition MessageRank.h:72
bool isNull() const
Vrai si rang non initialisé correspondant au rang par défaut.
Definition MessageRank.h:78
Informations pour envoyer/recevoir un message point à point.
Informations des buffers de réception.
Requête d'un message.
Definition Request.h:77
File pour les messages d'un rang en mémoire partagée.
File de messages entre les rangs partagés par un SharedMemoryParallelMng.
SharedMemoryMessageRequest(SubQueue *queue, Int64 request_id, MessageRank orig, MessageRank dest, MessageTag tag, ReceiveBufferInfo buf)
Créé une requête d'envoie.
SharedMemoryMessageRequest * matchingSendRequest()
Requête associée dans le cas où c'est un receive issu d'un probe
SharedMemoryMessageRequest(SubQueue *queue, Int64 request_id, MessageRank orig, MessageRank dest, MessageTag tag, SendBufferInfo buf)
Créé une requête de réception.
Gestionnaire du parallélisme utilisant les threads.
Vecteur 1D de données avec sémantique par valeur (style STL).
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
Definition Parallel.h:50
std::int64_t Int64
Type entier signé sur 64 bits.
std::int32_t Int32
Type entier signé sur 32 bits.