Arcane  v3.14.10.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
Messages.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2024 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* Messages.cc (C) 2000-2024 */
9/* */
10/* Identifiant d'un message point à point. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
15
16#include "arccore/base/NotSupportedException.h"
17#include "arccore/base/FatalErrorException.h"
18#include "arccore/base/NotImplementedException.h"
19
20#include "arccore/serialize/BasicSerializer.h"
21#include "arccore/serialize/BasicSerializerInternal.h"
22
23#include "arccore/message_passing/ISerializeDispatcher.h"
24#include "arccore/message_passing/IControlDispatcher.h"
25#include "arccore/message_passing/MessageId.h"
26#include "arccore/message_passing/PointToPointMessageInfo.h"
27
34/*---------------------------------------------------------------------------*/
35/*---------------------------------------------------------------------------*/
36
37namespace Arccore
38{
39
40/*---------------------------------------------------------------------------*/
41/*---------------------------------------------------------------------------*/
42
44{
45 public:
46
47 void doAllGather(MessagePassing::IMessagePassingMng* pm, const BasicSerializer* send_serializer,
48 BasicSerializer* receive_serializer);
49
50 template <typename DataType> void
51 _doGatherOne(MessagePassing::IMessagePassingMng* pm, Span<const DataType> send_values, Span<DataType> recv_buffer)
52 {
54 mpAllGatherVariable(pm, send_values, buf);
55 recv_buffer.copy(buf);
56 }
57};
58
59/*---------------------------------------------------------------------------*/
60/*---------------------------------------------------------------------------*/
61
62void BasicSerializeGatherMessage::
63doAllGather(MessagePassing::IMessagePassingMng* pm, const BasicSerializer* send_serializer,
64 BasicSerializer* receive_serializer)
65{
66 // TODO: ne supporte pas encore les types 'Float16' et 'BFloat16'
67 // car ces derniers ne sont pas supportés dans les messages MPI.
68
69 const BasicSerializer* sbuf = send_serializer;
70 BasicSerializer* recv_buf = receive_serializer;
71 BasicSerializer::Impl2* sbuf_p2 = sbuf->m_p2;
72 BasicSerializer::Impl2* recv_p2 = recv_buf->m_p2;
73
74 Span<const Real> send_real = sbuf_p2->realBytes();
75 Span<const Int16> send_int16 = sbuf_p2->int16Bytes();
76 Span<const Int32> send_int32 = sbuf_p2->int32Bytes();
77 Span<const Int64> send_int64 = sbuf_p2->int64Bytes();
78 Span<const Byte> send_byte = sbuf_p2->byteBytes();
79 Span<const Int8> send_int8 = sbuf_p2->int8Bytes();
80 Span<const Float16> send_float16 = sbuf_p2->float16Bytes();
81 Span<const BFloat16> send_bfloat16 = sbuf_p2->bfloat16Bytes();
82 Span<const Float32> send_float32 = sbuf_p2->float32Bytes();
83
84 Int64 sizes[9];
85 sizes[0] = send_real.size();
86 sizes[1] = send_int16.size();
87 sizes[2] = send_int32.size();
88 sizes[3] = send_int64.size();
89 sizes[4] = send_byte.size();
90 sizes[5] = send_int8.size();
91 sizes[6] = send_float16.size();
92 sizes[7] = send_bfloat16.size();
93 sizes[8] = send_float32.size();
94
95 mpAllReduce(pm, MessagePassing::ReduceSum, Int64ArrayView(9, sizes));
96
97 Int64 recv_nb_real = sizes[0];
98 Int64 recv_nb_int16 = sizes[1];
99 Int64 recv_nb_int32 = sizes[2];
100 Int64 recv_nb_int64 = sizes[3];
101 Int64 recv_nb_byte = sizes[4];
102 Int64 recv_nb_int8 = sizes[5];
103 Int64 recv_nb_float16 = sizes[6];
104 Int64 recv_nb_bfloat16 = sizes[7];
105 Int64 recv_nb_float32 = sizes[8];
106
107 if (recv_nb_float16 != 0)
108 ARCCORE_THROW(NotImplementedException, "AllGather with serialized type 'float16' is not yet implemented");
109 if (recv_nb_bfloat16 != 0)
110 ARCCORE_THROW(NotImplementedException, "AllGather with serialized type 'bfloat16' is not yet implemented");
111
112 recv_p2->allocateBuffer(recv_nb_real, recv_nb_int16, recv_nb_int32, recv_nb_int64, recv_nb_byte,
113 recv_nb_int8, recv_nb_float16, recv_nb_bfloat16, recv_nb_float32);
114
115 auto recv_p = recv_buf->_p();
116
117 _doGatherOne(pm, send_real, recv_p->getRealBuffer());
118 _doGatherOne(pm, send_int32, recv_p->getInt32Buffer());
119 _doGatherOne(pm, send_int16, recv_p->getInt16Buffer());
120 _doGatherOne(pm, send_int64, recv_p->getInt64Buffer());
121 _doGatherOne(pm, send_byte, recv_p->getByteBuffer());
122 _doGatherOne(pm, send_int8, recv_p->getInt8Buffer());
123 _doGatherOne(pm, send_float32, recv_p->getFloat32Buffer());
124}
125
126} // namespace Arccore
127
128/*---------------------------------------------------------------------------*/
129/*---------------------------------------------------------------------------*/
130
132{
133
134/*---------------------------------------------------------------------------*/
135/*---------------------------------------------------------------------------*/
141Ref<IRequestList>
143{
144 auto d = pm->dispatchers()->controlDispatcher();
145 return d->createRequestListRef();
146}
147
148/*---------------------------------------------------------------------------*/
149/*---------------------------------------------------------------------------*/
150
152{
153 auto d = pm->dispatchers()->controlDispatcher();
154 d->waitAllRequests(requests);
155}
156
157/*---------------------------------------------------------------------------*/
158/*---------------------------------------------------------------------------*/
159
161{
162 mpWaitAll(pm, ArrayView<Request>(1, &request));
163}
164
165/*---------------------------------------------------------------------------*/
166/*---------------------------------------------------------------------------*/
167
169{
170 auto d = pm->dispatchers()->controlDispatcher();
171 d->waitSomeRequests(requests, indexes, false);
172}
173
174/*---------------------------------------------------------------------------*/
175/*---------------------------------------------------------------------------*/
176
178{
179 auto d = pm->dispatchers()->controlDispatcher();
180 d->waitSomeRequests(requests, indexes, true);
181}
182
183/*---------------------------------------------------------------------------*/
184/*---------------------------------------------------------------------------*/
185
187 ArrayView<bool> indexes, eWaitType w_type)
188{
189 switch (w_type) {
190 case WaitAll:
191 mpWaitAll(pm, requests);
192 indexes.fill(true);
193 break;
194 case WaitSome:
195 mpWaitSome(pm, requests, indexes);
196 break;
198 mpTestSome(pm, requests, indexes);
199 break;
200 }
201}
202
203/*---------------------------------------------------------------------------*/
204/*---------------------------------------------------------------------------*/
205
206MessageId
208{
209 auto d = pm->dispatchers()->controlDispatcher();
210 return d->probe(message);
211}
212
213/*---------------------------------------------------------------------------*/
214/*---------------------------------------------------------------------------*/
215
216MessageSourceInfo
218{
219 auto d = pm->dispatchers()->controlDispatcher();
220 return d->legacyProbe(message);
221}
222
223/*---------------------------------------------------------------------------*/
224/*---------------------------------------------------------------------------*/
225
226IMessagePassingMng*
228{
229 auto d = pm->dispatchers()->controlDispatcher();
230 return d->commSplit(keep);
231}
232
233/*---------------------------------------------------------------------------*/
234/*---------------------------------------------------------------------------*/
235
237{
238 auto d = pm->dispatchers()->controlDispatcher();
239 d->barrier();
240}
241
242/*---------------------------------------------------------------------------*/
243/*---------------------------------------------------------------------------*/
244
245Request
247{
248 auto d = pm->dispatchers()->controlDispatcher();
249 return d->nonBlockingBarrier();
250}
251
252/*---------------------------------------------------------------------------*/
253/*---------------------------------------------------------------------------*/
254
257{
258 auto d = pm->dispatchers()->serializeDispatcher();
260}
261
262/*---------------------------------------------------------------------------*/
263/*---------------------------------------------------------------------------*/
264
265Request
267 const PointToPointMessageInfo& message)
268{
269 auto d = pm->dispatchers()->serializeDispatcher();
270 return d->sendSerializer(values, message);
271}
272
273/*---------------------------------------------------------------------------*/
274/*---------------------------------------------------------------------------*/
275
276Request
278 const PointToPointMessageInfo& message)
279{
280 auto d = pm->dispatchers()->serializeDispatcher();
281 return d->receiveSerializer(values, message);
282}
283
284/*---------------------------------------------------------------------------*/
285/*---------------------------------------------------------------------------*/
286
287MessageSourceInfo IControlDispatcher::
288legacyProbe(const PointToPointMessageInfo&)
289{
290 ARCCORE_THROW(NotSupportedException, "pure virtual call to legacyProbe()");
291}
292
293/*---------------------------------------------------------------------------*/
294/*---------------------------------------------------------------------------*/
295
296void mpAllGather(IMessagePassingMng* pm, const ISerializer* send_serializer, ISerializer* receive_serialize)
297{
298 auto* s = dynamic_cast<const BasicSerializer*>(send_serializer);
299 if (!s)
300 ARCCORE_FATAL("send_serializer is not a BasicSerializer");
301 auto* r = dynamic_cast<BasicSerializer*>(receive_serialize);
302 if (!r)
303 ARCCORE_FATAL("receive_serializer is not a BasicSerializer");
305 message.doAllGather(pm, s, r);
306}
307
308/*---------------------------------------------------------------------------*/
309/*---------------------------------------------------------------------------*/
310namespace
311{
312 template <typename DataType> inline ITypeDispatcher<DataType>*
313 _getDispatcher(IMessagePassingMng* pm)
314 {
315 ARCCORE_CHECK_POINTER(pm);
316 DataType* x = nullptr;
317 auto* dispatcher = pm->dispatchers()->dispatcher(x);
318 ARCCORE_CHECK_POINTER(dispatcher);
319 return dispatcher;
320 }
321} // namespace
322
323#define ARCCORE_GENERATE_MESSAGEPASSING_DEFINITION(type) \
324 void mpAllGather(IMessagePassingMng* pm, Span<const type> send_buf, Span<type> recv_buf) \
325 { \
326 _getDispatcher<type>(pm)->allGather(send_buf, recv_buf); \
327 } \
328 void mpGather(IMessagePassingMng* pm, Span<const type> send_buf, Span<type> recv_buf, Int32 rank) \
329 { \
330 _getDispatcher<type>(pm)->gather(send_buf, recv_buf, rank); \
331 } \
332 Request mpNonBlockingAllGather(IMessagePassingMng* pm, Span<const type> send_buf, Span<type> recv_buf) \
333 { \
334 return _getDispatcher<type>(pm)->nonBlockingAllGather(send_buf, recv_buf); \
335 } \
336 Request mpNonBlockingGather(IMessagePassingMng* pm, Span<const type> send_buf, Span<type> recv_buf, Int32 rank) \
337 { \
338 return _getDispatcher<type>(pm)->nonBlockingGather(send_buf, recv_buf, rank); \
339 } \
340 void mpAllGatherVariable(IMessagePassingMng* pm, Span<const type> send_buf, Array<type>& recv_buf) \
341 { \
342 _getDispatcher<type>(pm)->allGatherVariable(send_buf, recv_buf); \
343 } \
344 void mpGatherVariable(IMessagePassingMng* pm, Span<const type> send_buf, Array<type>& recv_buf, Int32 rank) \
345 { \
346 _getDispatcher<type>(pm)->gatherVariable(send_buf, recv_buf, rank); \
347 } \
348 Request mpGather(IMessagePassingMng* pm, GatherMessageInfo<type>& gather_info) \
349 { \
350 return _getDispatcher<type>(pm)->gather(gather_info); \
351 } \
352 void mpScatterVariable(IMessagePassingMng* pm, Span<const type> send_buf, Span<type> recv_buf, Int32 root) \
353 { \
354 return _getDispatcher<type>(pm)->scatterVariable(send_buf, recv_buf, root); \
355 } \
356 type mpAllReduce(IMessagePassingMng* pm, eReduceType rt, type v) \
357 { \
358 return _getDispatcher<type>(pm)->allReduce(rt, v); \
359 } \
360 void mpAllReduce(IMessagePassingMng* pm, eReduceType rt, Span<type> buf) \
361 { \
362 _getDispatcher<type>(pm)->allReduce(rt, buf); \
363 } \
364 Request mpNonBlockingAllReduce(IMessagePassingMng* pm, eReduceType rt, Span<const type> send_buf, Span<type> recv_buf) \
365 { \
366 return _getDispatcher<type>(pm)->nonBlockingAllReduce(rt, send_buf, recv_buf); \
367 } \
368 void mpBroadcast(IMessagePassingMng* pm, Span<type> send_buf, Int32 rank) \
369 { \
370 _getDispatcher<type>(pm)->broadcast(send_buf, rank); \
371 } \
372 Request mpNonBlockingBroadcast(IMessagePassingMng* pm, Span<type> send_buf, Int32 rank) \
373 { \
374 return _getDispatcher<type>(pm)->nonBlockingBroadcast(send_buf, rank); \
375 } \
376 void mpSend(IMessagePassingMng* pm, Span<const type> values, Int32 rank) \
377 { \
378 _getDispatcher<type>(pm)->send(values, rank, true); \
379 } \
380 void mpReceive(IMessagePassingMng* pm, Span<type> values, Int32 rank) \
381 { \
382 _getDispatcher<type>(pm)->receive(values, rank, true); \
383 } \
384 Request mpSend(IMessagePassingMng* pm, Span<const type> values, Int32 rank, bool is_blocked) \
385 { \
386 return _getDispatcher<type>(pm)->send(values, rank, is_blocked); \
387 } \
388 Request mpSend(IMessagePassingMng* pm, Span<const type> values, const PointToPointMessageInfo& message) \
389 { \
390 return _getDispatcher<type>(pm)->send(values, message); \
391 } \
392 Request mpReceive(IMessagePassingMng* pm, Span<type> values, Int32 rank, bool is_blocked) \
393 { \
394 return _getDispatcher<type>(pm)->receive(values, rank, is_blocked); \
395 } \
396 Request mpReceive(IMessagePassingMng* pm, Span<type> values, const PointToPointMessageInfo& message) \
397 { \
398 return _getDispatcher<type>(pm)->receive(values, message); \
399 } \
400 void mpAllToAll(IMessagePassingMng* pm, Span<const type> send_buf, Span<type> recv_buf, Int32 count) \
401 { \
402 return _getDispatcher<type>(pm)->allToAll(send_buf, recv_buf, count); \
403 } \
404 Request mpNonBlockingAllToAll(IMessagePassingMng* pm, Span<const type> send_buf, Span<type> recv_buf, Int32 count) \
405 { \
406 return _getDispatcher<type>(pm)->nonBlockingAllToAll(send_buf, recv_buf, count); \
407 } \
408 void mpAllToAllVariable(IMessagePassingMng* pm, Span<const type> send_buf, ConstArrayView<Int32> send_count, \
409 ConstArrayView<Int32> send_index, Span<type> recv_buf, \
410 ConstArrayView<Int32> recv_count, ConstArrayView<Int32> recv_index) \
411 { \
412 _getDispatcher<type>(pm)->allToAllVariable(send_buf, send_count, send_index, recv_buf, recv_count, recv_index); \
413 } \
414 Request mpNonBlockingAllToAllVariable(IMessagePassingMng* pm, Span<const type> send_buf, ConstArrayView<Int32> send_count, \
415 ConstArrayView<Int32> send_index, Span<type> recv_buf, \
416 ConstArrayView<Int32> recv_count, ConstArrayView<Int32> recv_index) \
417 { \
418 return _getDispatcher<type>(pm)->nonBlockingAllToAllVariable(send_buf, send_count, send_index, recv_buf, recv_count, recv_index); \
419 }
420
421/*---------------------------------------------------------------------------*/
422/*---------------------------------------------------------------------------*/
423
424ARCCORE_GENERATE_MESSAGEPASSING_DEFINITION(char)
425ARCCORE_GENERATE_MESSAGEPASSING_DEFINITION(signed char)
426ARCCORE_GENERATE_MESSAGEPASSING_DEFINITION(unsigned char)
427
428ARCCORE_GENERATE_MESSAGEPASSING_DEFINITION(short)
429ARCCORE_GENERATE_MESSAGEPASSING_DEFINITION(unsigned short)
430ARCCORE_GENERATE_MESSAGEPASSING_DEFINITION(int)
431ARCCORE_GENERATE_MESSAGEPASSING_DEFINITION(unsigned int)
432ARCCORE_GENERATE_MESSAGEPASSING_DEFINITION(long)
433ARCCORE_GENERATE_MESSAGEPASSING_DEFINITION(unsigned long)
434ARCCORE_GENERATE_MESSAGEPASSING_DEFINITION(long long)
435ARCCORE_GENERATE_MESSAGEPASSING_DEFINITION(unsigned long long)
436
437ARCCORE_GENERATE_MESSAGEPASSING_DEFINITION(float)
438ARCCORE_GENERATE_MESSAGEPASSING_DEFINITION(double)
439ARCCORE_GENERATE_MESSAGEPASSING_DEFINITION(long double)
440
441ARCCORE_GENERATE_MESSAGEPASSING_DEFINITION(BFloat16)
442ARCCORE_GENERATE_MESSAGEPASSING_DEFINITION(Float16)
443
444/*---------------------------------------------------------------------------*/
445/*---------------------------------------------------------------------------*/
446
447} // namespace Arccore::MessagePassing
448
449/*---------------------------------------------------------------------------*/
450/*---------------------------------------------------------------------------*/
Liste des fonctions d'échange de message.
virtual Ref< Parallel::IRequestList > createRequestListRef()=0
Créé une liste de requêtes pour ce gestionnaire.
virtual MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message)=0
Sonde si des messages sont disponibles.
virtual UniqueArray< Integer > waitSomeRequests(ArrayView< Request > rvalues)=0
Bloque en attendant qu'une des requêtes rvalues soit terminée.
virtual Ref< ISerializeMessageList > createSerializeMessageListRef()=0
Créé une liste pour gérer les 'ISerializeMessage'.
virtual void waitAllRequests(ArrayView< Request > rvalues)=0
Bloque en attendant que les requêtes rvalues soient terminées.
virtual MessageId probe(const PointToPointMessageInfo &message)=0
Sonde si des messages sont disponibles.
virtual void barrier()=0
Effectue une barière.
Vue modifiable d'un tableau d'un type T.
void fill(const T &o) noexcept
Remplit le tableau avec la valeur o.
Implémentation basique de 'ISerializer'.
Interface du gestionnaire des échanges de messages.
Informations pour envoyer/recevoir un message point à point.
Requête d'un message.
Definition Request.h:77
Exception lorsqu'une fonction n'est pas implémentée.
Exception lorsqu'une opération n'est pas supportée.
Référence à une instance.
ARCCORE_HOST_DEVICE void copy(const U &copy_array)
Recopie le tableau copy_array dans l'instance.
Definition Span.h:357
constexpr ARCCORE_HOST_DEVICE SizeType size() const noexcept
Retourne la taille du tableau.
Definition Span.h:209
Vue d'un tableau d'éléments de type T.
Definition Span.h:510
Vecteur 1D de données avec sémantique par valeur (style STL).
Espace de nommage contenant les types et déclarations qui gèrent le mécanisme de parallélisme par éch...
Request mpNonBlockingBarrier(IMessagePassingMng *pm)
Effectue une barrière non bloquante.
Definition Messages.cc:246
void mpBarrier(IMessagePassingMng *pm)
Effectue une barrière.
Definition Messages.cc:236
void mpAllGather(IMessagePassingMng *pm, const ISerializer *send_serializer, ISerializer *receive_serialize)
Message allGather() pour une sérialisation.
Definition Messages.cc:296
IMessagePassingMng * mpSplit(IMessagePassingMng *pm, bool keep)
Créé une nouvelle instance de IMessagePassingMng.
Definition Messages.cc:227
MessageId mpProbe(IMessagePassingMng *pm, const PointToPointMessageInfo &message)
Teste si un message est disponible.
Definition Messages.cc:207
Request mpSend(IMessagePassingMng *pm, const ISerializer *values, const PointToPointMessageInfo &message)
Message d'envoi utilisant un ISerializer.
Definition Messages.cc:266
MessageSourceInfo mpLegacyProbe(IMessagePassingMng *pm, const PointToPointMessageInfo &message)
Teste si un message est disponible.
Definition Messages.cc:217
Ref< ISerializeMessageList > mpCreateSerializeMessageListRef(IMessagePassingMng *pm)
Créé une liste de messages de sérialisation.
Definition Messages.cc:256
void mpWaitAll(IMessagePassingMng *pm, ArrayView< Request > requests)
Bloque tant que les requêtes de requests ne sont pas terminées.
Definition Messages.cc:151
void mpTestSome(IMessagePassingMng *pm, ArrayView< Request > requests, ArrayView< bool > indexes)
Teste si des requêtes de request sont terminées.
Definition Messages.cc:177
@ WaitSome
Attend que tous les messages de la liste soient traités.
void mpWaitSome(IMessagePassingMng *pm, ArrayView< Request > requests, ArrayView< bool > indexes)
Bloque jusqu'à ce qu'au moins une des requêtes de request soit terminée.
Definition Messages.cc:168
void mpWait(IMessagePassingMng *pm, Request request)
Bloque jusqu'à ce que la requête request soit terminée.
Definition Messages.cc:160
@ ReduceSum
Somme des valeurs.
Request mpReceive(IMessagePassingMng *pm, ISerializer *values, const PointToPointMessageInfo &message)
Message de réception utilisant un ISerializer.
Definition Messages.cc:277
Ref< IRequestList > mpCreateRequestListRef(IMessagePassingMng *pm)
Créé une liste de requêtes.
Definition Messages.cc:142
Espace de nom de Arccore.
Definition ArcaneTypes.h:24
ArrayView< Int64 > Int64ArrayView
Equivalent C d'un tableau à une dimension d'entiers 64 bits.
Definition BaseTypes.h:121
std::int64_t Int64
Type entier signé sur 64 bits.