Arcane  v3.16.0.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
SharedMemoryParallelMng.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* SharedMemoryParallelMng.cc (C) 2000-2025 */
9/* */
10/* Implémentation des messages en mode mémoire partagé. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/parallel/thread/SharedMemoryParallelMng.h"
15
16#include "arcane/utils/NotImplementedException.h"
17#include "arcane/utils/NotSupportedException.h"
18#include "arcane/utils/PlatformUtils.h"
19#include "arcane/utils/TraceInfo.h"
20#include "arcane/utils/NumericTypes.h"
21#include "arcane/utils/ArgumentException.h"
22#include "arcane/utils/FatalErrorException.h"
23#include "arcane/utils/ITraceMng.h"
24
25#include "arcane/parallel/IStat.h"
26
27#include "arcane/parallel/thread/SharedMemoryParallelDispatch.h"
28#include "arcane/parallel/thread/ISharedMemoryMessageQueue.h"
29
30#include "arcane/core/Timer.h"
31#include "arcane/core/IIOMng.h"
32#include "arcane/core/ISerializeMessageList.h"
33#include "arcane/core/IItemFamily.h"
34#include "arcane/core/internal/SerializeMessage.h"
35
36#include "arcane/impl/TimerMng.h"
37#include "arcane/impl/ParallelReplication.h"
38#include "arcane/impl/internal/ParallelMngUtilsFactoryBase.h"
39
40#include "arccore/message_passing/RequestListBase.h"
41#include "arccore/message_passing/SerializeMessageList.h"
42
43#include <map>
44
45/*---------------------------------------------------------------------------*/
46/*---------------------------------------------------------------------------*/
47
48namespace Arcane
49{
50extern "C++" ARCANE_IMPL_EXPORT IIOMng*
51arcaneCreateIOMng(IParallelMng* psm);
52}
53
55{
56
57/*---------------------------------------------------------------------------*/
58/*---------------------------------------------------------------------------*/
64{
66 public:
67
68 explicit RequestList(SharedMemoryParallelMng* pm)
69 : m_parallel_mng(pm), m_message_queue(pm->m_message_queue),
70 m_local_rank(m_parallel_mng->commRank()){}
71 public:
72 void _wait(Parallel::eWaitType wait_type) override
73 {
74 switch(wait_type){
75 case Parallel::WaitAll:
76 return m_message_queue->waitAll(_requests());
78 return m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),false);
80 return m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),true);
81 }
82 }
83 private:
84 SharedMemoryParallelMng* m_parallel_mng;
85 ISharedMemoryMessageQueue* m_message_queue;
86 Int32 m_local_rank;
87};
88
89/*---------------------------------------------------------------------------*/
90/*---------------------------------------------------------------------------*/
91
92SharedMemoryParallelMng::
93SharedMemoryParallelMng(const SharedMemoryParallelMngBuildInfo& build_info)
94: ParallelMngDispatcher(ParallelMngDispatcherBuildInfo(build_info.rank,build_info.nb_rank))
95, m_trace(build_info.trace_mng)
96, m_thread_mng(build_info.thread_mng)
97, m_sequential_parallel_mng(build_info.sequential_parallel_mng)
98, m_timer_mng(nullptr)
99, m_replication(new ParallelReplication())
100, m_world_parallel_mng(build_info.world_parallel_mng)
101, m_io_mng(nullptr)
102, m_message_queue(build_info.message_queue)
103, m_is_parallel(build_info.nb_rank!=1)
104, m_rank(build_info.rank)
105, m_nb_rank(build_info.nb_rank)
106, m_is_initialized(false)
107, m_stat(Parallel::createDefaultStat())
108, m_thread_barrier(build_info.thread_barrier)
109, m_all_dispatchers(build_info.all_dispatchers)
110, m_sub_builder_factory(build_info.sub_builder_factory)
111, m_parent_container_ref(build_info.container)
112, m_mpi_communicator(build_info.communicator)
113, m_utils_factory(createRef<ParallelMngUtilsFactoryBase>())
114{
115 if (!m_world_parallel_mng)
116 m_world_parallel_mng = this;
117}
118
119/*---------------------------------------------------------------------------*/
120/*---------------------------------------------------------------------------*/
121
122SharedMemoryParallelMng::
123~SharedMemoryParallelMng()
124{
125 delete m_replication;
126 m_sequential_parallel_mng.reset();
127 delete m_io_mng;
128 delete m_timer_mng;
129 delete m_stat;
130}
131
132/*---------------------------------------------------------------------------*/
133/*---------------------------------------------------------------------------*/
134
135namespace
136{
137// Classe pour créer les différents dispatchers
138class DispatchCreator
139{
140 public:
141 DispatchCreator(ITraceMng* tm,SharedMemoryParallelMng* mpm,
142 ISharedMemoryMessageQueue* message_queue,
143 SharedMemoryAllDispatcher* all_dispatchers)
144 : m_tm(tm), m_mpm(mpm), m_message_queue(message_queue),
145 m_all_dispatchers(all_dispatchers){}
146 public:
147 template<typename DataType> SharedMemoryParallelDispatch<DataType>*
148 create()
149 {
150 ISharedMemoryMessageQueue* tmq = m_message_queue;
151 SharedMemoryAllDispatcher* ad = m_all_dispatchers;
152 auto& field = ad->instance((DataType*)nullptr);
153 return new SharedMemoryParallelDispatch<DataType>(m_tm,m_mpm,tmq,field);
154 }
155
156 ITraceMng* m_tm;
157 SharedMemoryParallelMng* m_mpm;
158 ISharedMemoryMessageQueue* m_message_queue;
159 SharedMemoryAllDispatcher* m_all_dispatchers;
160};
161}
162
163/*---------------------------------------------------------------------------*/
164/*---------------------------------------------------------------------------*/
165
167build()
168{
169 m_message_queue->setTraceMng(m_rank,traceMng());
170 m_timer_mng = new TimerMng(traceMng());
171
172 DispatchCreator creator(m_trace.get(),this,m_message_queue,m_all_dispatchers);
173 this->createDispatchers(creator);
174
175 m_io_mng = arcaneCreateIOMng(this);
176}
177
178/*----------------------------------------------------------------------------*/
179/*---------------------------------------------------------------------------*/
180
183{
184 Trace::Setter mci(m_trace.get(),"Thread");
185 if (m_is_initialized){
186 m_trace->warning() << "SharedMemoryParallelMng already initialized";
187 return;
188 }
189
190 m_is_initialized = true;
191}
192
193/*---------------------------------------------------------------------------*/
194/*---------------------------------------------------------------------------*/
195
198{
199 return m_utils_factory->createGetVariablesValuesOperation(this)._release();
200}
201
204{
205 return m_utils_factory->createTransferValuesOperation(this)._release();
206}
207
210{
211 return m_utils_factory->createExchanger(this)._release();
212}
213
214/*---------------------------------------------------------------------------*/
215/*---------------------------------------------------------------------------*/
216
217/*---------------------------------------------------------------------------*/
218/*---------------------------------------------------------------------------*/
219
220void SharedMemoryParallelMng::
221sendSerializer(ISerializer* values,Int32 dest_rank)
222{
223 auto p2p_message = buildMessage(dest_rank,Parallel::Blocking);
224 Request r = m_message_queue->addSend(p2p_message,SendBufferInfo(values));
225 m_message_queue->waitAll(ArrayView<Request>(1,&r));
226}
227
228/*---------------------------------------------------------------------------*/
229/*---------------------------------------------------------------------------*/
230
231Parallel::Request SharedMemoryParallelMng::
232sendSerializer(ISerializer* values,Int32 rank,ByteArray& bytes)
233{
234 ARCANE_UNUSED(bytes);
235 return m_message_queue->addSend(buildMessage(rank,Parallel::Blocking),SendBufferInfo(values));
236}
237
238/*---------------------------------------------------------------------------*/
239/*---------------------------------------------------------------------------*/
240
243{
244 return m_utils_factory->createSendSerializeMessage(this, rank)._release();
245}
246
247/*---------------------------------------------------------------------------*/
248/*---------------------------------------------------------------------------*/
249
250void SharedMemoryParallelMng::
251broadcastSerializer(ISerializer* values,Int32 rank)
252{
253 // Implementation basique pour l'instant.
254 // Le rank qui broadcast envoie le message à tout le monde.
255 if (m_rank==rank){
257 for( Int32 i=0; i<m_nb_rank; ++i ){
258 if (i!=m_rank){
259 requests.add(m_message_queue->addSend(buildMessage(i,Parallel::NonBlocking),SendBufferInfo(values)));
260 }
261 }
262 m_message_queue->waitAll(requests);
263 }
264 else{
265 recvSerializer(values,rank);
266 }
267}
268
269/*---------------------------------------------------------------------------*/
270/*---------------------------------------------------------------------------*/
271
272void SharedMemoryParallelMng::
273recvSerializer(ISerializer* values,Int32 rank)
274{
275 auto p2p_message = buildMessage(rank,Parallel::Blocking);
276 Request r = m_message_queue->addReceive(p2p_message,ReceiveBufferInfo(values));
277 m_message_queue->waitAll(ArrayView<Request>(1,&r));
278}
279
280/*---------------------------------------------------------------------------*/
281/*---------------------------------------------------------------------------*/
282
285{
286 return m_utils_factory->createReceiveSerializeMessage(this, rank)._release();
287}
288
289/*---------------------------------------------------------------------------*/
290/*---------------------------------------------------------------------------*/
291
294{
295 ARCANE_UNUSED(requests);
296 throw NotImplementedException(A_FUNCINFO);
297}
298
299/*---------------------------------------------------------------------------*/
300/*---------------------------------------------------------------------------*/
301
304{
305 if (m_stat)
306 m_stat->print(m_trace.get());
307}
308
309/*---------------------------------------------------------------------------*/
310/*---------------------------------------------------------------------------*/
311
313barrier()
314{
315 m_thread_barrier->wait();
316}
317
318/*---------------------------------------------------------------------------*/
319/*---------------------------------------------------------------------------*/
320
323{
324 Real begin_time = platform::getRealTime();
325 m_message_queue->waitAll(requests);
326 Real end_time = platform::getRealTime();
327 m_stat->add("WaitAll",(end_time-begin_time),0);
328}
329
330/*---------------------------------------------------------------------------*/
331/*---------------------------------------------------------------------------*/
332
333ISerializeMessageList* SharedMemoryParallelMng::
334_createSerializeMessageList()
335{
336 return new MP::internal::SerializeMessageList(messagePassingMng());
337}
338
339/*---------------------------------------------------------------------------*/
340/*---------------------------------------------------------------------------*/
341
343probe(const PointToPointMessageInfo& message)
344{
345 PointToPointMessageInfo p2p_message(message);
346 p2p_message.setEmiterRank(MessageRank(m_rank));
347 return m_message_queue->probe(p2p_message);
348}
349
350/*---------------------------------------------------------------------------*/
351/*---------------------------------------------------------------------------*/
352
355{
356 PointToPointMessageInfo p2p_message(message);
357 p2p_message.setEmiterRank(MessageRank(m_rank));
358 return m_message_queue->legacyProbe(p2p_message);
359}
360
361/*---------------------------------------------------------------------------*/
362/*---------------------------------------------------------------------------*/
363
364auto SharedMemoryParallelMng::
365sendSerializer(const ISerializer* values,const PointToPointMessageInfo& message) -> Request
366{
367 auto p2p_message = buildMessage(message);
368 return m_message_queue->addSend(p2p_message,SendBufferInfo(values));
369}
370
371/*---------------------------------------------------------------------------*/
372/*---------------------------------------------------------------------------*/
373
374auto SharedMemoryParallelMng::
375receiveSerializer(ISerializer* values,const PointToPointMessageInfo& message) -> Request
376{
377 auto p2p_message = buildMessage(message);
378 return m_message_queue->addReceive(p2p_message,ReceiveBufferInfo(values));
379}
380
381/*---------------------------------------------------------------------------*/
382/*---------------------------------------------------------------------------*/
383
386{
387 return m_utils_factory->createSynchronizer(this,family)._release();
388}
389
390/*---------------------------------------------------------------------------*/
391/*---------------------------------------------------------------------------*/
392
394createSynchronizer(const ItemGroup& group)
395{
396 return m_utils_factory->createSynchronizer(this,group)._release();
397}
398
399/*---------------------------------------------------------------------------*/
400/*---------------------------------------------------------------------------*/
401
404{
405 return m_utils_factory->createTopology(this)._release();
406}
407
408/*---------------------------------------------------------------------------*/
409/*---------------------------------------------------------------------------*/
410
412replication() const
413{
414 return m_replication;
415}
416
417/*---------------------------------------------------------------------------*/
418/*---------------------------------------------------------------------------*/
419
422{
423 delete m_replication;
424 m_replication = v;
425}
426
427/*---------------------------------------------------------------------------*/
428/*---------------------------------------------------------------------------*/
429
430IParallelMng* SharedMemoryParallelMng::
431_createSubParallelMng(Int32ConstArrayView kept_ranks)
432{
433 ARCANE_UNUSED(kept_ranks);
434 // On ne peut pas implémenter cette méthode car on passe par
435 // IParallelMngContainer::_createParallelMng() qui créé obligatoirement
436 // un 'Ref<IParallelMng>'.
437 ARCANE_THROW(NotSupportedException,"Use createSubParallelMngRef() instead");
438}
439
440/*---------------------------------------------------------------------------*/
441/*---------------------------------------------------------------------------*/
442
445{
446 if (kept_ranks.empty())
447 ARCANE_FATAL("kept_ranks is empty");
448 ARCANE_CHECK_POINTER(m_sub_builder_factory);
449
451 Int32 nb_rank = kept_ranks.size();
452
453 // Regarde si je suis dans les listes des rangs conservés et si oui
454 // détermine mon rang dans le IParallelMng créé
455 Int32 my_new_rank = (-1);
456 for( Integer i=0; i<nb_rank; ++i )
457 if (kept_ranks[i]==m_rank){
458 my_new_rank = i;
459 break;
460 }
461
462 barrier();
463 // Le rang 0 créé le builder
464 if (m_rank==0){
465 builder = m_sub_builder_factory->_createParallelMngBuilder(nb_rank,m_mpi_communicator);
466 // Positionne le builder pour tout le monde
467 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder = builder;
468 }
469 barrier();
470
471 builder = m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder;
472 ARCANE_CHECK_POINTER(builder.get());
473
474 Ref<IParallelMng> new_parallel_mng;
475 if (my_new_rank>=0){
476 new_parallel_mng = builder->_createParallelMng(my_new_rank,traceMng());
477 //auto* new_sm = dynamic_cast<SharedMemoryParallelMng*>(new_parallel_mng.get());
478 //if (new_sm)
479 //new_sm->m_mpi_communicator = m_mpi_communicator;
480 }
481 barrier();
482 // Ici, tout le monde a créé son IParallelMng. On peut donc
483 // supprimer la référence au builder.
484 // TODO: il faudra ajouter un compteur de référence sur le builder
485 // sinon il ne sera jamais détruit.
486 if (m_rank==0){
487 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder.reset();
488 }
489 barrier();
490
491 return new_parallel_mng;
492}
493
494/*---------------------------------------------------------------------------*/
495/*---------------------------------------------------------------------------*/
496
503
504/*---------------------------------------------------------------------------*/
505/*---------------------------------------------------------------------------*/
506
507Ref<IParallelMng> SharedMemoryParallelMng::
508sequentialParallelMngRef()
509{
510 return m_sequential_parallel_mng;
511}
512
515{
516 return m_sequential_parallel_mng.get();
517}
518
519/*---------------------------------------------------------------------------*/
520/*---------------------------------------------------------------------------*/
521
523buildMessage(const PointToPointMessageInfo& orig_message)
524{
525 PointToPointMessageInfo p2p_message{orig_message};
526 p2p_message.setEmiterRank(MessageRank(m_rank));
527 return p2p_message;
528}
529
530/*---------------------------------------------------------------------------*/
531/*---------------------------------------------------------------------------*/
532
533PointToPointMessageInfo SharedMemoryParallelMng::
534buildMessage(Int32 dest,Parallel::eBlockingType blocking_mode)
535{
536 return buildMessage({MessageRank(dest),blocking_mode});
537}
538
539/*---------------------------------------------------------------------------*/
540/*---------------------------------------------------------------------------*/
541
547
548/*---------------------------------------------------------------------------*/
549/*---------------------------------------------------------------------------*/
550
551} // End namespace Arcane::MessagePassing
552
553/*---------------------------------------------------------------------------*/
554/*---------------------------------------------------------------------------*/
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Vue modifiable d'un tableau d'un type T.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
constexpr bool empty() const noexcept
true si le tableau est vide (size()==0)
Opérations pour accéder aux valeurs de variables d'un autre sous-domaine.
Interface du gestionnaire des entrées sorties.
Definition IIOMng.h:42
Interface d'une famille d'entités.
Definition IItemFamily.h:84
Echange d'informations entre processeurs.
Interface du gestionnaire de parallélisme pour un sous-domaine.
Informations sur la réplication des sous-domaines en parallèle.
Informations sur la topologie d'allocation des coeurs de calcul.
Envoie de valeurs sur différents processeurs.
Interface d'un service de synchronisation de variable.
Groupe d'entités de maillage.
Definition ItemGroup.h:49
Interface d'un message de sérialisation entre IMessagePassingMng.
Interface d'une file de messages avec les threads.
Informations sur la source d'un message.
Informations pour envoyer/recevoir un message point à point.
void setEmiterRank(MessageRank rank)
Positionne le rang de l'émetteur du message.
Informations des buffers de réception.
Requête d'un message.
Definition Request.h:77
Implémentation de IRequestList pour SharedMemoryParallelMng.
void _wait(Parallel::eWaitType wait_type) override
Effectue l'attente ou le test.
IParallelTopology * createTopology() override
Créé une instance contenant les infos sur la topologie des rangs de ce gestionnnaire.
IGetVariablesValuesParallelOperation * createGetVariablesValuesOperation() override
Retourne une opération pour récupérer les valeurs d'une variable sur les entités d'un autre sous-doma...
Ref< Parallel::IRequestList > createRequestListRef() override
Créé une liste de requêtes pour ce gestionnaire.
void printStats() override
Affiche des statistiques liées à ce gestionnaire du parallélisme.
void initialize() override
Initialise le gestionnaire du parallélisme.
void setReplication(IParallelReplication *v) override
Positionne les Informations sur la réplication.
PointToPointMessageInfo buildMessage(Int32 dest, MP::eBlockingType is_blocking)
Construit un message avec pour destinataire dest.
void waitAllRequests(ArrayView< Request > requests) override
Bloque en attendant que les requêtes rvalues soient terminées.
Ref< IParallelMngUtilsFactory > _internalUtilsFactory() const override
Fabrique des fonctions utilitaires.
IVariableSynchronizer * createSynchronizer(IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
MessageId probe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
ISerializeMessage * createSendSerializer(Int32 rank) override
Créé un message non bloquant pour envoyer des données sérialisées au rang rank.
ITraceMng * traceMng() const override
Gestionnaire de traces.
IParallelMng * sequentialParallelMng() override
Retourne un gestionnaire de parallélisme séquentiel.
IParallelExchanger * createExchanger() override
Retourne une interface pour transférer des messages entre processeurs.
void freeRequests(ArrayView< Parallel::Request > requests) override
Libère les requêtes.
ISerializeMessage * createReceiveSerializer(Int32 rank) override
Créé un message non bloquant pour recevoir des données sérialisées du rang rank.
Ref< IParallelMng > createSubParallelMngRef(Int32ConstArrayView kept_ranks) override
Créé un nouveau gestionnaire de parallélisme pour un sous-ensemble des rangs.
ITransferValuesParallelOperation * createTransferValuesOperation() override
Retourne une opération pour transférer des valeurs entre sous-domaine.
MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
IParallelReplication * replication() const override
Informations sur la réplication.
Classe de base d'une liste de requêtes.
Exception lorsqu'une fonction n'est pas implémentée.
Exception lorsqu'une opération n'est pas supportée.
Redirige la gestion des messages des sous-domaines suivant le type de l'argument.
IMessagePassingMng * messagePassingMng() const override
Gestionnaire de message de Arccore associé
Classe de base d'une fabrique pour les fonctions utilitaires de IParallelMng.
Informations sur la réplication des sous-domaines en parallèle.
Référence à une instance.
InstanceType * get() const
Instance associée ou nullptr si aucune.
Gestionnaire de timer.
Definition TimerMng.h:39
Positionne une classe de message.
Vecteur 1D de données avec sémantique par valeur (style STL).
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
Definition Parallel.h:50
@ WaitSome
Attend que tous les messages de la liste soient traités.
eBlockingType
Type indiquant si un message est bloquant ou non.
Implémentation de la concurrence.
ARCCORE_BASE_EXPORT Real getRealTime()
Temps Real utilisé en secondes.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
Ref< TrueType > createRef(Args &&... args)
Créé une instance de type TrueType avec les arguments Args et retourne une référence dessus.
Int32 Integer
Type représentant un entier.
Array< Byte > ByteArray
Tableau dynamique à une dimension de caractères.
Definition UtilsTypes.h:208
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:569
double Real
Type représentant un réel.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int32_t Int32
Type entier signé sur 32 bits.
Infos pour construire un SharedMemoryParallelMng.