Arcane  v3.15.0.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
SharedMemoryMessageQueue.h
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2024 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* SharedMemoryMessageQueue.h (C) 2000-2024 */
9/* */
10/* Implémentation d'une file de messages en mémoire partagée. */
11/*---------------------------------------------------------------------------*/
12#ifndef ARCANE_PARALLEL_THREAD_SHAREDMEMORYMESSAGEQUEUE_H
13#define ARCANE_PARALLEL_THREAD_SHAREDMEMORYMESSAGEQUEUE_H
14/*---------------------------------------------------------------------------*/
15/*---------------------------------------------------------------------------*/
16
17#include "arcane/utils/TraceAccessor.h"
18#include "arcane/utils/Array.h"
19
20#include "arcane/parallel/thread/ISharedMemoryMessageQueue.h"
21
22#include "arcane/ISerializer.h"
23#include "arcane/Parallel.h"
24
25#include <algorithm>
26
27/*---------------------------------------------------------------------------*/
28/*---------------------------------------------------------------------------*/
29
31{
32
33/*---------------------------------------------------------------------------*/
34/*---------------------------------------------------------------------------*/
35
36class SharedMemoryParallelMng;
37class SharedMemoryMessageRequest;
38class SharedMemoryMessageQueue;
39using MessageRank = MP::MessageRank;
40using MessageTag = MP::MessageTag;
41
42/*---------------------------------------------------------------------------*/
43/*---------------------------------------------------------------------------*/
47class ARCANE_THREAD_EXPORT SharedMemoryMessageQueue
49{
50 public:
51
52 class SubQueue;
53
54 public:
55
56 SharedMemoryMessageQueue() : m_nb_thread(0), m_atomic_request_id(0){}
58
59 public:
60
61 void init(Int32 nb_thread) override;
62 void waitAll(ArrayView<Request> requests) override;
63 void waitSome(Int32 rank,ArrayView<Request> requests,ArrayView<bool> requests_done,
64 bool is_non_blockign) override;
65 void setTraceMng(Int32 rank,ITraceMng* tm) override;
66
67 public:
68
69 Request addReceive(const PointToPointMessageInfo& message,ReceiveBufferInfo buf) override;
70 Request addSend(const PointToPointMessageInfo& message,SendBufferInfo buf) override;
71
72 public:
73
74 MessageId probe(const PointToPointMessageInfo& message) override;
75 MessageSourceInfo legacyProbe(const PointToPointMessageInfo& message) override;
76
77 private:
78
79 Int32 m_nb_thread = 0;
80 UniqueArray<SubQueue*> m_sub_queues;
81 std::atomic<Int64> m_atomic_request_id;
82
83 private:
84
85 SubQueue* _getSubQueue(MessageRank rank)
86 {
87 return m_sub_queues[rank.value()];
88 }
89 Int64 _getNextRequestId()
90 {
91 return m_atomic_request_id.fetch_add(1);
92 }
94 SubQueue* _getSourceSubQueue(const MP::PointToPointMessageInfo& message);
95 SubQueue* _getDestinationSubQueue(const MP::PointToPointMessageInfo& message);
96};
97
98/*---------------------------------------------------------------------------*/
99/*---------------------------------------------------------------------------*/
105class ARCANE_THREAD_EXPORT SharedMemoryMessageRequest
106{
107 public:
109 {
110 public:
111 explicit SortFunctor(Int32 nb_thread) : m_nb_thread(nb_thread){}
113 {
114 if (!r1){
115 if (!r2)
116 return true;
117 else
118 return false;
119 }
120 if (!r2)
121 return true;
122 if (r1->isRecv() && !r2->isRecv())
123 return true;
124 if (!r1->isRecv() && r2->isRecv())
125 return false;
126 Int32 i1 = _getQueueIndex(r1->orig(),r1->dest());
127 Int32 i2 = _getQueueIndex(r2->orig(),r2->dest());
128 if (i1==i2)
129 return r1->id() < r2->id();
130 return i1 < i2;
131 }
132 Int32 _getQueueIndex(MessageRank thread1,MessageRank thread2) const
133 {
134 if (thread1.isNull())
135 ARCANE_FATAL("Null rank for thread1");
136 if (thread2.isNull())
137 ARCANE_FATAL("Null rank for thread2");
138 // TODO: gérer dest()==A_NULL_RANK.
139 return thread1.value() + (thread2.value() * m_nb_thread);
140 }
141 Int32 m_nb_thread;
142 };
143 public:
145 public:
149 : m_queue(queue), m_request_id(request_id), m_is_recv(true)
150 , m_orig(orig), m_dest(dest), m_tag(tag), m_receive_buffer_info(buf)
151 {
152 }
156 : m_queue(queue), m_request_id(request_id), m_is_recv(false)
157 , m_orig(orig), m_dest(dest), m_tag(tag), m_send_buffer_info(buf)
158 {
159 }
160 public:
161 MessageRank orig() { return m_orig; }
162 MessageRank dest() { return m_dest; }
163 MessageTag tag() { return m_tag; }
164 bool isRecv() { return m_is_recv; }
165 bool isDone() { return m_is_done; }
166 void setDone(bool v) { m_is_done = v; }
167 SendBufferInfo sendBufferInfo() { return m_send_buffer_info; }
168 ReceiveBufferInfo receiveBufferInfo() { return m_receive_buffer_info; }
169 SharedMemoryMessageQueue::SubQueue* queue() { return m_queue; }
170 void copyFromSender(SharedMemoryMessageRequest* sender);
171 Int64 id() const { return m_request_id; }
172 void destroy();
173 ISerializer* recvSerializer() { return m_receive_buffer_info.serializer(); }
174 const ISerializer* sendSerializer() { return m_send_buffer_info.serializer(); }
175 // Dans le cas ou dest()==A_NULL_RANK, positionne une fois le message recu le rang d'origine.
176 void setSource(MessageRank s)
177 {
178 if (isRecv())
179 m_dest = s;
180 }
182 SharedMemoryMessageRequest* matchingSendRequest() { return m_matching_send_request; }
183 void setMatchingSendRequest(SharedMemoryMessageRequest* r) { m_matching_send_request = r; }
184
185 private:
186 SubQueue* m_queue;
187 Int64 m_request_id;
188 bool m_is_recv;
189 MessageRank m_orig;
190 MessageRank m_dest;
191 MessageTag m_tag;
192 bool m_is_done = false;
193 SharedMemoryMessageRequest* m_matching_send_request = nullptr;
194 bool m_is_destroyed = false;
195 SendBufferInfo m_send_buffer_info;
196 ReceiveBufferInfo m_receive_buffer_info;
197};
198
199/*---------------------------------------------------------------------------*/
200/*---------------------------------------------------------------------------*/
201
202} // End namespace Arcane::MessagePassing
203
204/*---------------------------------------------------------------------------*/
205/*---------------------------------------------------------------------------*/
206
207#endif
208
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Definition Lima.cc:149
Interface d'une file de messages avec les threads.
Informations des buffers de réception.
File pour les messages d'un rang en mémoire partagée.
File de messages entre les rangs partagés par un SharedMemoryParallelMng.
SharedMemoryMessageRequest(SubQueue *queue, Int64 request_id, MessageRank orig, MessageRank dest, MessageTag tag, ReceiveBufferInfo buf)
Créé une requête d'envoie.
SharedMemoryMessageRequest * matchingSendRequest()
Requête associée dans le cas où c'est un receive issu d'un probe
SharedMemoryMessageRequest(SubQueue *queue, Int64 request_id, MessageRank orig, MessageRank dest, MessageTag tag, SendBufferInfo buf)
Créé une requête de réception.
Interface du gestionnaire de traces.
Int32 value() const
Valeur du rang.
Definition MessageRank.h:72
Informations sur la source d'un message.
Informations pour envoyer/recevoir un message point à point.
Requête d'un message.
Definition Request.h:77
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
Definition Parallel.h:94