Arcane  v3.16.7.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
SharedMemoryParallelMng.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* SharedMemoryParallelMng.cc (C) 2000-2025 */
9/* */
10/* Implémentation des messages en mode mémoire partagé. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/parallel/thread/SharedMemoryParallelMng.h"
15
16#include "arcane/utils/NotImplementedException.h"
17#include "arcane/utils/NotSupportedException.h"
18#include "arcane/utils/PlatformUtils.h"
19#include "arcane/utils/TraceInfo.h"
20#include "arcane/utils/NumericTypes.h"
21#include "arcane/utils/ArgumentException.h"
22#include "arcane/utils/FatalErrorException.h"
23#include "arcane/utils/ITraceMng.h"
24
25#include "arcane/parallel/IStat.h"
26
27#include "arcane/parallel/thread/SharedMemoryParallelDispatch.h"
28#include "arcane/parallel/thread/ISharedMemoryMessageQueue.h"
29#include "arcane/parallel/thread/internal/SharedMemoryMachineMemoryWindowBaseInternalCreator.h"
30#include "arcane/parallel/thread/internal/SharedMemoryMachineMemoryWindowBaseInternal.h"
31
32#include "arcane/core/Timer.h"
33#include "arcane/core/IIOMng.h"
34#include "arcane/core/ISerializeMessageList.h"
35#include "arcane/core/IItemFamily.h"
36#include "arcane/core/internal/SerializeMessage.h"
37#include "arcane/core/internal/ParallelMngInternal.h"
38
39#include "arcane/impl/TimerMng.h"
40#include "arcane/impl/ParallelReplication.h"
41#include "arcane/impl/internal/ParallelMngUtilsFactoryBase.h"
42
43#include "arccore/message_passing/RequestListBase.h"
44#include "arccore/message_passing/SerializeMessageList.h"
45
46#include <map>
47
48/*---------------------------------------------------------------------------*/
49/*---------------------------------------------------------------------------*/
50
51namespace Arcane
52{
53extern "C++" ARCANE_IMPL_EXPORT IIOMng*
54arcaneCreateIOMng(IParallelMng* psm);
55}
56
58{
59
60/*---------------------------------------------------------------------------*/
61/*---------------------------------------------------------------------------*/
67{
69 public:
70
71 explicit RequestList(SharedMemoryParallelMng* pm)
72 : m_parallel_mng(pm), m_message_queue(pm->m_message_queue),
73 m_local_rank(m_parallel_mng->commRank()){}
74 public:
75 void _wait(Parallel::eWaitType wait_type) override
76 {
77 switch(wait_type){
78 case Parallel::WaitAll:
79 return m_message_queue->waitAll(_requests());
81 return m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),false);
83 return m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),true);
84 }
85 }
86 private:
87 SharedMemoryParallelMng* m_parallel_mng;
88 ISharedMemoryMessageQueue* m_message_queue;
89 Int32 m_local_rank;
90};
91
92/*---------------------------------------------------------------------------*/
93/*---------------------------------------------------------------------------*/
94
96: public ParallelMngInternal
97{
98 public:
99
100 explicit Impl(SharedMemoryParallelMng* pm, SharedMemoryMachineMemoryWindowBaseInternalCreator* window_creator)
101 : ParallelMngInternal(pm)
102 , m_parallel_mng(pm)
103 , m_window_creator(window_creator)
104 {}
105
106 ~Impl() override = default;
107
108 public:
109
111 {
112 return makeRef(m_window_creator->createWindow(m_parallel_mng->commRank(), sizeof_segment, sizeof_type));
113 }
114
115 private:
116
117 SharedMemoryParallelMng* m_parallel_mng;
119};
120
121/*---------------------------------------------------------------------------*/
122/*---------------------------------------------------------------------------*/
123
124SharedMemoryParallelMng::
125SharedMemoryParallelMng(const SharedMemoryParallelMngBuildInfo& build_info)
126: ParallelMngDispatcher(ParallelMngDispatcherBuildInfo(build_info.rank,build_info.nb_rank))
127, m_trace(build_info.trace_mng)
128, m_thread_mng(build_info.thread_mng)
129, m_sequential_parallel_mng(build_info.sequential_parallel_mng)
130, m_timer_mng(nullptr)
131, m_replication(new ParallelReplication())
132, m_world_parallel_mng(build_info.world_parallel_mng)
133, m_io_mng(nullptr)
134, m_message_queue(build_info.message_queue)
135, m_is_parallel(build_info.nb_rank!=1)
136, m_rank(build_info.rank)
137, m_nb_rank(build_info.nb_rank)
138, m_is_initialized(false)
139, m_stat(Parallel::createDefaultStat())
140, m_thread_barrier(build_info.thread_barrier)
141, m_all_dispatchers(build_info.all_dispatchers)
142, m_sub_builder_factory(build_info.sub_builder_factory)
143, m_parent_container_ref(build_info.container)
144, m_mpi_communicator(build_info.communicator)
145, m_utils_factory(createRef<ParallelMngUtilsFactoryBase>())
146, m_parallel_mng_internal(new Impl(this, build_info.window_creator))
147{
148 if (!m_world_parallel_mng)
149 m_world_parallel_mng = this;
150}
151
152/*---------------------------------------------------------------------------*/
153/*---------------------------------------------------------------------------*/
154
155SharedMemoryParallelMng::
156~SharedMemoryParallelMng()
157{
158 delete m_parallel_mng_internal;
159 delete m_replication;
160 m_sequential_parallel_mng.reset();
161 delete m_io_mng;
162 delete m_timer_mng;
163 delete m_stat;
164}
165
166/*---------------------------------------------------------------------------*/
167/*---------------------------------------------------------------------------*/
168
169namespace
170{
171// Classe pour créer les différents dispatchers
172class DispatchCreator
173{
174 public:
175 DispatchCreator(ITraceMng* tm,SharedMemoryParallelMng* mpm,
176 ISharedMemoryMessageQueue* message_queue,
177 SharedMemoryAllDispatcher* all_dispatchers)
178 : m_tm(tm), m_mpm(mpm), m_message_queue(message_queue),
179 m_all_dispatchers(all_dispatchers){}
180 public:
181 template<typename DataType> SharedMemoryParallelDispatch<DataType>*
182 create()
183 {
184 ISharedMemoryMessageQueue* tmq = m_message_queue;
185 SharedMemoryAllDispatcher* ad = m_all_dispatchers;
186 auto& field = ad->instance((DataType*)nullptr);
187 return new SharedMemoryParallelDispatch<DataType>(m_tm,m_mpm,tmq,field);
188 }
189
190 ITraceMng* m_tm;
191 SharedMemoryParallelMng* m_mpm;
192 ISharedMemoryMessageQueue* m_message_queue;
193 SharedMemoryAllDispatcher* m_all_dispatchers;
194};
195}
196
197/*---------------------------------------------------------------------------*/
198/*---------------------------------------------------------------------------*/
199
201build()
202{
203 m_message_queue->setTraceMng(m_rank,traceMng());
204 m_timer_mng = new TimerMng(traceMng());
205
206 DispatchCreator creator(m_trace.get(),this,m_message_queue,m_all_dispatchers);
207 this->createDispatchers(creator);
208
209 m_io_mng = arcaneCreateIOMng(this);
210}
211
212/*----------------------------------------------------------------------------*/
213/*---------------------------------------------------------------------------*/
214
217{
218 Trace::Setter mci(m_trace.get(),"Thread");
219 if (m_is_initialized){
220 m_trace->warning() << "SharedMemoryParallelMng already initialized";
221 return;
222 }
223
224 m_is_initialized = true;
225}
226
227/*---------------------------------------------------------------------------*/
228/*---------------------------------------------------------------------------*/
229
232{
233 return m_utils_factory->createGetVariablesValuesOperation(this)._release();
234}
235
238{
239 return m_utils_factory->createTransferValuesOperation(this)._release();
240}
241
244{
245 return m_utils_factory->createExchanger(this)._release();
246}
247
248/*---------------------------------------------------------------------------*/
249/*---------------------------------------------------------------------------*/
250
251/*---------------------------------------------------------------------------*/
252/*---------------------------------------------------------------------------*/
253
254void SharedMemoryParallelMng::
255sendSerializer(ISerializer* values,Int32 dest_rank)
256{
257 auto p2p_message = buildMessage(dest_rank,Parallel::Blocking);
258 Request r = m_message_queue->addSend(p2p_message,SendBufferInfo(values));
259 m_message_queue->waitAll(ArrayView<Request>(1,&r));
260}
261
262/*---------------------------------------------------------------------------*/
263/*---------------------------------------------------------------------------*/
264
265Parallel::Request SharedMemoryParallelMng::
266sendSerializer(ISerializer* values,Int32 rank,ByteArray& bytes)
267{
268 ARCANE_UNUSED(bytes);
269 return m_message_queue->addSend(buildMessage(rank,Parallel::Blocking),SendBufferInfo(values));
270}
271
272/*---------------------------------------------------------------------------*/
273/*---------------------------------------------------------------------------*/
274
277{
278 return m_utils_factory->createSendSerializeMessage(this, rank)._release();
279}
280
281/*---------------------------------------------------------------------------*/
282/*---------------------------------------------------------------------------*/
283
284void SharedMemoryParallelMng::
285broadcastSerializer(ISerializer* values,Int32 rank)
286{
287 // Implementation basique pour l'instant.
288 // Le rank qui broadcast envoie le message à tout le monde.
289 if (m_rank==rank){
291 for( Int32 i=0; i<m_nb_rank; ++i ){
292 if (i!=m_rank){
293 requests.add(m_message_queue->addSend(buildMessage(i,Parallel::NonBlocking),SendBufferInfo(values)));
294 }
295 }
296 m_message_queue->waitAll(requests);
297 }
298 else{
299 recvSerializer(values,rank);
300 }
301}
302
303/*---------------------------------------------------------------------------*/
304/*---------------------------------------------------------------------------*/
305
306void SharedMemoryParallelMng::
307recvSerializer(ISerializer* values,Int32 rank)
308{
309 auto p2p_message = buildMessage(rank,Parallel::Blocking);
310 Request r = m_message_queue->addReceive(p2p_message,ReceiveBufferInfo(values));
311 m_message_queue->waitAll(ArrayView<Request>(1,&r));
312}
313
314/*---------------------------------------------------------------------------*/
315/*---------------------------------------------------------------------------*/
316
319{
320 return m_utils_factory->createReceiveSerializeMessage(this, rank)._release();
321}
322
323/*---------------------------------------------------------------------------*/
324/*---------------------------------------------------------------------------*/
325
328{
329 ARCANE_UNUSED(requests);
330 throw NotImplementedException(A_FUNCINFO);
331}
332
333/*---------------------------------------------------------------------------*/
334/*---------------------------------------------------------------------------*/
335
338{
339 if (m_stat)
340 m_stat->print(m_trace.get());
341}
342
343/*---------------------------------------------------------------------------*/
344/*---------------------------------------------------------------------------*/
345
347barrier()
348{
349 m_thread_barrier->wait();
350}
351
352/*---------------------------------------------------------------------------*/
353/*---------------------------------------------------------------------------*/
354
357{
358 Real begin_time = platform::getRealTime();
359 m_message_queue->waitAll(requests);
360 Real end_time = platform::getRealTime();
361 m_stat->add("WaitAll",(end_time-begin_time),0);
362}
363
364/*---------------------------------------------------------------------------*/
365/*---------------------------------------------------------------------------*/
366
367ISerializeMessageList* SharedMemoryParallelMng::
368_createSerializeMessageList()
369{
370 return new MP::internal::SerializeMessageList(messagePassingMng());
371}
372
373/*---------------------------------------------------------------------------*/
374/*---------------------------------------------------------------------------*/
375
377probe(const PointToPointMessageInfo& message)
378{
379 PointToPointMessageInfo p2p_message(message);
380 p2p_message.setEmiterRank(MessageRank(m_rank));
381 return m_message_queue->probe(p2p_message);
382}
383
384/*---------------------------------------------------------------------------*/
385/*---------------------------------------------------------------------------*/
386
389{
390 PointToPointMessageInfo p2p_message(message);
391 p2p_message.setEmiterRank(MessageRank(m_rank));
392 return m_message_queue->legacyProbe(p2p_message);
393}
394
395/*---------------------------------------------------------------------------*/
396/*---------------------------------------------------------------------------*/
397
398auto SharedMemoryParallelMng::
399sendSerializer(const ISerializer* values,const PointToPointMessageInfo& message) -> Request
400{
401 auto p2p_message = buildMessage(message);
402 return m_message_queue->addSend(p2p_message,SendBufferInfo(values));
403}
404
405/*---------------------------------------------------------------------------*/
406/*---------------------------------------------------------------------------*/
407
408auto SharedMemoryParallelMng::
409receiveSerializer(ISerializer* values,const PointToPointMessageInfo& message) -> Request
410{
411 auto p2p_message = buildMessage(message);
412 return m_message_queue->addReceive(p2p_message,ReceiveBufferInfo(values));
413}
414
415/*---------------------------------------------------------------------------*/
416/*---------------------------------------------------------------------------*/
417
420{
421 return m_utils_factory->createSynchronizer(this,family)._release();
422}
423
424/*---------------------------------------------------------------------------*/
425/*---------------------------------------------------------------------------*/
426
428createSynchronizer(const ItemGroup& group)
429{
430 return m_utils_factory->createSynchronizer(this,group)._release();
431}
432
433/*---------------------------------------------------------------------------*/
434/*---------------------------------------------------------------------------*/
435
438{
439 return m_utils_factory->createTopology(this)._release();
440}
441
442/*---------------------------------------------------------------------------*/
443/*---------------------------------------------------------------------------*/
444
446replication() const
447{
448 return m_replication;
449}
450
451/*---------------------------------------------------------------------------*/
452/*---------------------------------------------------------------------------*/
453
456{
457 delete m_replication;
458 m_replication = v;
459}
460
461/*---------------------------------------------------------------------------*/
462/*---------------------------------------------------------------------------*/
463
464IParallelMng* SharedMemoryParallelMng::
465_createSubParallelMng(Int32ConstArrayView kept_ranks)
466{
467 ARCANE_UNUSED(kept_ranks);
468 // On ne peut pas implémenter cette méthode car on passe par
469 // IParallelMngContainer::_createParallelMng() qui créé obligatoirement
470 // un 'Ref<IParallelMng>'.
471 ARCANE_THROW(NotSupportedException,"Use createSubParallelMngRef() instead");
472}
473
474/*---------------------------------------------------------------------------*/
475/*---------------------------------------------------------------------------*/
476
479{
480 if (kept_ranks.empty())
481 ARCANE_FATAL("kept_ranks is empty");
482 ARCANE_CHECK_POINTER(m_sub_builder_factory);
483
485 Int32 nb_rank = kept_ranks.size();
486
487 // Regarde si je suis dans les listes des rangs conservés et si oui
488 // détermine mon rang dans le IParallelMng créé
489 Int32 my_new_rank = (-1);
490 for( Integer i=0; i<nb_rank; ++i )
491 if (kept_ranks[i]==m_rank){
492 my_new_rank = i;
493 break;
494 }
495
496 barrier();
497 // Le rang 0 créé le builder
498 if (m_rank==0){
499 builder = m_sub_builder_factory->_createParallelMngBuilder(nb_rank,m_mpi_communicator);
500 // Positionne le builder pour tout le monde
501 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder = builder;
502 }
503 barrier();
504
505 builder = m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder;
506 ARCANE_CHECK_POINTER(builder.get());
507
508 Ref<IParallelMng> new_parallel_mng;
509 if (my_new_rank>=0){
510 new_parallel_mng = builder->_createParallelMng(my_new_rank,traceMng());
511 //auto* new_sm = dynamic_cast<SharedMemoryParallelMng*>(new_parallel_mng.get());
512 //if (new_sm)
513 //new_sm->m_mpi_communicator = m_mpi_communicator;
514 }
515 barrier();
516 // Ici, tout le monde a créé son IParallelMng. On peut donc
517 // supprimer la référence au builder.
518 // TODO: il faudra ajouter un compteur de référence sur le builder
519 // sinon il ne sera jamais détruit.
520 if (m_rank==0){
521 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder.reset();
522 }
523 barrier();
524
525 return new_parallel_mng;
526}
527
528/*---------------------------------------------------------------------------*/
529/*---------------------------------------------------------------------------*/
530
537
538/*---------------------------------------------------------------------------*/
539/*---------------------------------------------------------------------------*/
540
541Ref<IParallelMng> SharedMemoryParallelMng::
542sequentialParallelMngRef()
543{
544 return m_sequential_parallel_mng;
545}
546
549{
550 return m_sequential_parallel_mng.get();
551}
552
553/*---------------------------------------------------------------------------*/
554/*---------------------------------------------------------------------------*/
555
557buildMessage(const PointToPointMessageInfo& orig_message)
558{
559 PointToPointMessageInfo p2p_message{orig_message};
560 p2p_message.setEmiterRank(MessageRank(m_rank));
561 return p2p_message;
562}
563
564/*---------------------------------------------------------------------------*/
565/*---------------------------------------------------------------------------*/
566
567PointToPointMessageInfo SharedMemoryParallelMng::
568buildMessage(Int32 dest,Parallel::eBlockingType blocking_mode)
569{
570 return buildMessage({MessageRank(dest),blocking_mode});
571}
572
573/*---------------------------------------------------------------------------*/
574/*---------------------------------------------------------------------------*/
575
581
582/*---------------------------------------------------------------------------*/
583/*---------------------------------------------------------------------------*/
584
585} // End namespace Arcane::MessagePassing
586
587/*---------------------------------------------------------------------------*/
588/*---------------------------------------------------------------------------*/
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Vue modifiable d'un tableau d'un type T.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
constexpr bool empty() const noexcept
true si le tableau est vide (size()==0)
Opérations pour accéder aux valeurs de variables d'un autre sous-domaine.
Interface du gestionnaire des entrées sorties.
Definition IIOMng.h:36
Interface d'une famille d'entités.
Definition IItemFamily.h:84
Échange d'informations entre processeurs.
Interface du gestionnaire de parallélisme pour un sous-domaine.
Informations sur la réplication des sous-domaines en parallèle.
Informations sur la topologie d'allocation des coeurs de calcul.
Envoie de valeurs sur différents processeurs.
Interface d'un service de synchronisation de variable.
Groupe d'entités de maillage.
Definition ItemGroup.h:49
Interface d'un message de sérialisation entre IMessagePassingMng.
Interface d'une file de messages avec les threads.
Informations sur la source d'un message.
Informations pour envoyer/recevoir un message point à point.
void setEmiterRank(MessageRank rank)
Positionne le rang de l'émetteur du message.
Informations des buffers de réception.
Requête d'un message.
Definition Request.h:77
Ref< IMachineMemoryWindowBaseInternal > createMachineMemoryWindowBase(Int64 sizeof_segment, Int32 sizeof_type) override
Méthode permettant de créer une fenêtre mémoire sur le noeud.
Implémentation de IRequestList pour SharedMemoryParallelMng.
void _wait(Parallel::eWaitType wait_type) override
Effectue l'attente ou le test.
IParallelTopology * createTopology() override
Créé une instance contenant les infos sur la topologie des rangs de ce gestionnnaire.
IGetVariablesValuesParallelOperation * createGetVariablesValuesOperation() override
Retourne une opération pour récupérer les valeurs d'une variable sur les entités d'un autre sous-doma...
Ref< Parallel::IRequestList > createRequestListRef() override
Créé une liste de requêtes pour ce gestionnaire.
void printStats() override
Affiche des statistiques liées à ce gestionnaire du parallélisme.
void initialize() override
Initialise le gestionnaire du parallélisme.
void setReplication(IParallelReplication *v) override
Positionne les Informations sur la réplication.
PointToPointMessageInfo buildMessage(Int32 dest, MP::eBlockingType is_blocking)
Construit un message avec pour destinataire dest.
void waitAllRequests(ArrayView< Request > requests) override
Bloque en attendant que les requêtes rvalues soient terminées.
Ref< IParallelMngUtilsFactory > _internalUtilsFactory() const override
Fabrique des fonctions utilitaires.
IVariableSynchronizer * createSynchronizer(IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
MessageId probe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
ISerializeMessage * createSendSerializer(Int32 rank) override
Créé un message non bloquant pour envoyer des données sérialisées au rang rank.
ITraceMng * traceMng() const override
Gestionnaire de traces.
IParallelMng * sequentialParallelMng() override
Retourne un gestionnaire de parallélisme séquentiel.
IParallelExchanger * createExchanger() override
Retourne une interface pour transférer des messages entre processeurs.
void freeRequests(ArrayView< Parallel::Request > requests) override
Libère les requêtes.
ISerializeMessage * createReceiveSerializer(Int32 rank) override
Créé un message non bloquant pour recevoir des données sérialisées du rang rank.
Ref< IParallelMng > createSubParallelMngRef(Int32ConstArrayView kept_ranks) override
Créé un nouveau gestionnaire de parallélisme pour un sous-ensemble des rangs.
ITransferValuesParallelOperation * createTransferValuesOperation() override
Retourne une opération pour transférer des valeurs entre sous-domaine.
MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
IParallelReplication * replication() const override
Informations sur la réplication.
Classe de base d'une liste de requêtes.
Exception lorsqu'une fonction n'est pas implémentée.
Exception lorsqu'une opération n'est pas supportée.
Redirige la gestion des messages des sous-domaines suivant le type de l'argument.
IMessagePassingMng * messagePassingMng() const override
Gestionnaire de message de Arccore associé
Classe de base d'une fabrique pour les fonctions utilitaires de IParallelMng.
Informations sur la réplication des sous-domaines en parallèle.
Référence à une instance.
InstanceType * get() const
Instance associée ou nullptr si aucune.
Gestionnaire de timer.
Definition TimerMng.h:39
Positionne une classe de message.
Vecteur 1D de données avec sémantique par valeur (style STL).
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
@ WaitSome
Attend que tous les messages de la liste soient traités.
eBlockingType
Type indiquant si un message est bloquant ou non.
Implémentation de la concurrence.
ARCCORE_BASE_EXPORT Real getRealTime()
Temps Real utilisé en secondes.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
Ref< TrueType > createRef(Args &&... args)
Créé une instance de type TrueType avec les arguments Args et retourne une référence dessus.
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
Array< Byte > ByteArray
Tableau dynamique à une dimension de caractères.
Definition UtilsTypes.h:208
ConstArrayView< Int32 > Int32ConstArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:569
double Real
Type représentant un réel.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int32_t Int32
Type entier signé sur 32 bits.
Infos pour construire un SharedMemoryParallelMng.