Arcane  v3.14.10.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
SharedMemoryParallelMng.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2024 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* SharedMemoryParallelMng.cc (C) 2000-2024 */
9/* */
10/* Implémentation des messages en mode mémoire partagé. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/parallel/thread/SharedMemoryParallelMng.h"
15
16#include "arcane/utils/NotImplementedException.h"
17#include "arcane/utils/NotSupportedException.h"
18#include "arcane/utils/PlatformUtils.h"
19#include "arcane/utils/TraceInfo.h"
20#include "arcane/utils/Real2.h"
21#include "arcane/utils/Real3.h"
22#include "arcane/utils/Real2x2.h"
23#include "arcane/utils/Real3x3.h"
24#include "arcane/utils/ArgumentException.h"
25#include "arcane/utils/FatalErrorException.h"
26#include "arcane/utils/ITraceMng.h"
27
28#include "arcane/parallel/IStat.h"
29
30#include "arcane/parallel/thread/SharedMemoryParallelDispatch.h"
31#include "arcane/parallel/thread/ISharedMemoryMessageQueue.h"
32
33#include "arcane/core/SerializeMessage.h"
34#include "arcane/core/Timer.h"
35#include "arcane/core/IIOMng.h"
36#include "arcane/core/ISerializeMessageList.h"
37#include "arcane/core/IItemFamily.h"
38
39#include "arcane/impl/TimerMng.h"
40#include "arcane/impl/ParallelReplication.h"
41#include "arcane/impl/ParallelMngUtilsFactoryBase.h"
42
43#include "arccore/message_passing/RequestListBase.h"
44#include "arccore/message_passing/SerializeMessageList.h"
45
46#include <map>
47
48/*---------------------------------------------------------------------------*/
49/*---------------------------------------------------------------------------*/
50
51namespace Arcane
52{
53extern "C++" ARCANE_IMPL_EXPORT IIOMng*
54arcaneCreateIOMng(IParallelMng* psm);
55}
56
58{
59
60/*---------------------------------------------------------------------------*/
61/*---------------------------------------------------------------------------*/
67{
69 public:
70
72 : m_parallel_mng(pm), m_message_queue(pm->m_message_queue),
73 m_local_rank(m_parallel_mng->commRank()){}
74 public:
76 {
77 switch(wait_type){
78 case Parallel::WaitAll:
79 return m_message_queue->waitAll(_requests());
80 case Parallel::WaitSome:
81 return m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),false);
82 case Parallel::WaitSomeNonBlocking:
83 return m_message_queue->waitSome(m_local_rank,_requests(),_requestsDone(),true);
84 }
85 }
86 private:
87 SharedMemoryParallelMng* m_parallel_mng;
88 ISharedMemoryMessageQueue* m_message_queue;
89 Int32 m_local_rank;
90};
91
92/*---------------------------------------------------------------------------*/
93/*---------------------------------------------------------------------------*/
94
95SharedMemoryParallelMng::
96SharedMemoryParallelMng(const SharedMemoryParallelMngBuildInfo& build_info)
98, m_trace(build_info.trace_mng)
99, m_thread_mng(build_info.thread_mng)
100, m_sequential_parallel_mng(build_info.sequential_parallel_mng)
101, m_timer_mng(nullptr)
102, m_replication(new ParallelReplication())
103, m_world_parallel_mng(build_info.world_parallel_mng)
104, m_io_mng(nullptr)
105, m_message_queue(build_info.message_queue)
106, m_is_parallel(build_info.nb_rank!=1)
107, m_rank(build_info.rank)
108, m_nb_rank(build_info.nb_rank)
109, m_is_initialized(false)
110, m_stat(Parallel::createDefaultStat())
111, m_thread_barrier(build_info.thread_barrier)
112, m_all_dispatchers(build_info.all_dispatchers)
113, m_sub_builder_factory(build_info.sub_builder_factory)
114, m_parent_container_ref(build_info.container)
115, m_mpi_communicator(build_info.communicator)
116, m_utils_factory(createRef<ParallelMngUtilsFactoryBase>())
117{
118 if (!m_world_parallel_mng)
119 m_world_parallel_mng = this;
120}
121
122/*---------------------------------------------------------------------------*/
123/*---------------------------------------------------------------------------*/
124
125SharedMemoryParallelMng::
126~SharedMemoryParallelMng()
127{
128 delete m_replication;
129 m_sequential_parallel_mng.reset();
130 delete m_io_mng;
131 delete m_timer_mng;
132 delete m_stat;
133}
134
135/*---------------------------------------------------------------------------*/
136/*---------------------------------------------------------------------------*/
137
138namespace
139{
140// Classe pour créer les différents dispatchers
141class DispatchCreator
142{
143 public:
144 DispatchCreator(ITraceMng* tm,SharedMemoryParallelMng* mpm,
145 ISharedMemoryMessageQueue* message_queue,
146 SharedMemoryAllDispatcher* all_dispatchers)
147 : m_tm(tm), m_mpm(mpm), m_message_queue(message_queue),
148 m_all_dispatchers(all_dispatchers){}
149 public:
150 template<typename DataType> SharedMemoryParallelDispatch<DataType>*
151 create()
152 {
153 ISharedMemoryMessageQueue* tmq = m_message_queue;
154 SharedMemoryAllDispatcher* ad = m_all_dispatchers;
155 auto& field = ad->instance((DataType*)nullptr);
156 return new SharedMemoryParallelDispatch<DataType>(m_tm,m_mpm,tmq,field);
157 }
158
159 ITraceMng* m_tm;
160 SharedMemoryParallelMng* m_mpm;
161 ISharedMemoryMessageQueue* m_message_queue;
162 SharedMemoryAllDispatcher* m_all_dispatchers;
163};
164}
165
166/*---------------------------------------------------------------------------*/
167/*---------------------------------------------------------------------------*/
168
170build()
171{
172 m_message_queue->setTraceMng(m_rank,traceMng());
173 m_timer_mng = new TimerMng(traceMng());
174
175 DispatchCreator creator(m_trace.get(),this,m_message_queue,m_all_dispatchers);
176 this->createDispatchers(creator);
177
178 m_io_mng = arcaneCreateIOMng(this);
179}
180
181/*----------------------------------------------------------------------------*/
182/*---------------------------------------------------------------------------*/
183
186{
187 Trace::Setter mci(m_trace.get(),"Thread");
188 if (m_is_initialized){
189 m_trace->warning() << "SharedMemoryParallelMng already initialized";
190 return;
191 }
192
193 m_is_initialized = true;
194}
195
196/*---------------------------------------------------------------------------*/
197/*---------------------------------------------------------------------------*/
198
201{
202 return m_utils_factory->createGetVariablesValuesOperation(this)._release();
203}
204
207{
208 return m_utils_factory->createTransferValuesOperation(this)._release();
209}
210
213{
214 return m_utils_factory->createExchanger(this)._release();
215}
216
217/*---------------------------------------------------------------------------*/
218/*---------------------------------------------------------------------------*/
219
220/*---------------------------------------------------------------------------*/
221/*---------------------------------------------------------------------------*/
222
223void SharedMemoryParallelMng::
224sendSerializer(ISerializer* values,Int32 dest_rank)
225{
226 auto p2p_message = buildMessage(dest_rank,Parallel::Blocking);
227 Request r = m_message_queue->addSend(p2p_message,SendBufferInfo(values));
228 m_message_queue->waitAll(ArrayView<Request>(1,&r));
229}
230
231/*---------------------------------------------------------------------------*/
232/*---------------------------------------------------------------------------*/
233
234Parallel::Request SharedMemoryParallelMng::
235sendSerializer(ISerializer* values,Int32 rank,ByteArray& bytes)
236{
237 ARCANE_UNUSED(bytes);
238 return m_message_queue->addSend(buildMessage(rank,Parallel::Blocking),SendBufferInfo(values));
239}
240
241/*---------------------------------------------------------------------------*/
242/*---------------------------------------------------------------------------*/
243
245createSendSerializer(Int32 rank)
246{
247 return new SerializeMessage(m_rank,rank,ISerializeMessage::MT_Send);
248}
249
250/*---------------------------------------------------------------------------*/
251/*---------------------------------------------------------------------------*/
252
253void SharedMemoryParallelMng::
254broadcastSerializer(ISerializer* values,Int32 rank)
255{
256 // Implementation basique pour l'instant.
257 // Le rank qui broadcast envoie le message à tout le monde.
258 if (m_rank==rank){
260 for( Int32 i=0; i<m_nb_rank; ++i ){
261 if (i!=m_rank){
262 requests.add(m_message_queue->addSend(buildMessage(i,Parallel::NonBlocking),SendBufferInfo(values)));
263 }
264 }
265 m_message_queue->waitAll(requests);
266 }
267 else{
268 recvSerializer(values,rank);
269 }
270}
271
272/*---------------------------------------------------------------------------*/
273/*---------------------------------------------------------------------------*/
274
275void SharedMemoryParallelMng::
276recvSerializer(ISerializer* values,Int32 rank)
277{
278 auto p2p_message = buildMessage(rank,Parallel::Blocking);
279 Request r = m_message_queue->addReceive(p2p_message,ReceiveBufferInfo(values));
280 m_message_queue->waitAll(ArrayView<Request>(1,&r));
281}
282
283/*---------------------------------------------------------------------------*/
284/*---------------------------------------------------------------------------*/
285
287createReceiveSerializer(Int32 rank)
288{
289 return new SerializeMessage(m_rank,rank,ISerializeMessage::MT_Recv);
290}
291
292/*---------------------------------------------------------------------------*/
293/*---------------------------------------------------------------------------*/
294
297{
298 ARCANE_UNUSED(requests);
299 throw NotImplementedException(A_FUNCINFO);
300}
301
302/*---------------------------------------------------------------------------*/
303/*---------------------------------------------------------------------------*/
304
307{
308 if (m_stat)
309 m_stat->print(m_trace.get());
310}
311
312/*---------------------------------------------------------------------------*/
313/*---------------------------------------------------------------------------*/
314
316barrier()
317{
318 m_thread_barrier->wait();
319}
320
321/*---------------------------------------------------------------------------*/
322/*---------------------------------------------------------------------------*/
323
326{
327 Real begin_time = platform::getRealTime();
328 m_message_queue->waitAll(requests);
329 Real end_time = platform::getRealTime();
330 m_stat->add("WaitAll",(end_time-begin_time),0);
331}
332
333/*---------------------------------------------------------------------------*/
334/*---------------------------------------------------------------------------*/
335
336ISerializeMessageList* SharedMemoryParallelMng::
337_createSerializeMessageList()
338{
339 return new MP::internal::SerializeMessageList(messagePassingMng());
340}
341
342/*---------------------------------------------------------------------------*/
343/*---------------------------------------------------------------------------*/
344
346probe(const PointToPointMessageInfo& message)
347{
349 p2p_message.setEmiterRank(MessageRank(m_rank));
350 return m_message_queue->probe(p2p_message);
351}
352
353/*---------------------------------------------------------------------------*/
354/*---------------------------------------------------------------------------*/
355
358{
360 p2p_message.setEmiterRank(MessageRank(m_rank));
361 return m_message_queue->legacyProbe(p2p_message);
362}
363
364/*---------------------------------------------------------------------------*/
365/*---------------------------------------------------------------------------*/
366
367auto SharedMemoryParallelMng::
368sendSerializer(const ISerializer* values,const PointToPointMessageInfo& message) -> Request
369{
370 auto p2p_message = buildMessage(message);
371 return m_message_queue->addSend(p2p_message,SendBufferInfo(values));
372}
373
374/*---------------------------------------------------------------------------*/
375/*---------------------------------------------------------------------------*/
376
377auto SharedMemoryParallelMng::
378receiveSerializer(ISerializer* values,const PointToPointMessageInfo& message) -> Request
379{
380 auto p2p_message = buildMessage(message);
381 return m_message_queue->addReceive(p2p_message,ReceiveBufferInfo(values));
382}
383
384/*---------------------------------------------------------------------------*/
385/*---------------------------------------------------------------------------*/
386
389{
390 return m_utils_factory->createSynchronizer(this,family)._release();
391}
392
393/*---------------------------------------------------------------------------*/
394/*---------------------------------------------------------------------------*/
395
397createSynchronizer(const ItemGroup& group)
398{
399 return m_utils_factory->createSynchronizer(this,group)._release();
400}
401
402/*---------------------------------------------------------------------------*/
403/*---------------------------------------------------------------------------*/
404
407{
408 return m_utils_factory->createTopology(this)._release();
409}
410
411/*---------------------------------------------------------------------------*/
412/*---------------------------------------------------------------------------*/
413
415replication() const
416{
417 return m_replication;
418}
419
420/*---------------------------------------------------------------------------*/
421/*---------------------------------------------------------------------------*/
422
425{
426 delete m_replication;
427 m_replication = v;
428}
429
430/*---------------------------------------------------------------------------*/
431/*---------------------------------------------------------------------------*/
432
433IParallelMng* SharedMemoryParallelMng::
434_createSubParallelMng(Int32ConstArrayView kept_ranks)
435{
436 ARCANE_UNUSED(kept_ranks);
437 // On ne peut pas implémenter cette méthode car on passe par
438 // IParallelMngContainer::_createParallelMng() qui créé obligatoirement
439 // un 'Ref<IParallelMng>'.
440 ARCANE_THROW(NotSupportedException,"Use createSubParallelMngRef() instead");
441}
442
443/*---------------------------------------------------------------------------*/
444/*---------------------------------------------------------------------------*/
445
448{
449 if (kept_ranks.empty())
450 ARCANE_FATAL("kept_ranks is empty");
451 ARCANE_CHECK_POINTER(m_sub_builder_factory);
452
454 Int32 nb_rank = kept_ranks.size();
455
456 // Regarde si je suis dans les listes des rangs conservés et si oui
457 // détermine mon rang dans le IParallelMng créé
458 Int32 my_new_rank = (-1);
459 for( Integer i=0; i<nb_rank; ++i )
460 if (kept_ranks[i]==m_rank){
461 my_new_rank = i;
462 break;
463 }
464
465 barrier();
466 // Le rang 0 créé le builder
467 if (m_rank==0){
468 builder = m_sub_builder_factory->_createParallelMngBuilder(nb_rank,m_mpi_communicator);
469 // Positionne le builder pour tout le monde
470 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder = builder;
471 }
472 barrier();
473
474 builder = m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder;
475 ARCANE_CHECK_POINTER(builder.get());
476
478 if (my_new_rank>=0){
479 new_parallel_mng = builder->_createParallelMng(my_new_rank,traceMng());
480 //auto* new_sm = dynamic_cast<SharedMemoryParallelMng*>(new_parallel_mng.get());
481 //if (new_sm)
482 //new_sm->m_mpi_communicator = m_mpi_communicator;
483 }
484 barrier();
485 // Ici, tout le monde a créé son IParallelMng. On peut donc
486 // supprimer la référence au builder.
487 // TODO: il faudra ajouter un compteur de référence sur le builder
488 // sinon il ne sera jamais détruit.
489 if (m_rank==0){
490 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder.reset();
491 }
492 barrier();
493
494 return new_parallel_mng;
495}
496
497/*---------------------------------------------------------------------------*/
498/*---------------------------------------------------------------------------*/
499
506
507/*---------------------------------------------------------------------------*/
508/*---------------------------------------------------------------------------*/
509
510Ref<IParallelMng> SharedMemoryParallelMng::
511sequentialParallelMngRef()
512{
513 return m_sequential_parallel_mng;
514}
515
518{
519 return m_sequential_parallel_mng.get();
520}
521
522/*---------------------------------------------------------------------------*/
523/*---------------------------------------------------------------------------*/
524
527{
529 p2p_message.setEmiterRank(MessageRank(m_rank));
530 return p2p_message;
531}
532
533/*---------------------------------------------------------------------------*/
534/*---------------------------------------------------------------------------*/
535
541
542/*---------------------------------------------------------------------------*/
543/*---------------------------------------------------------------------------*/
544
550
551/*---------------------------------------------------------------------------*/
552/*---------------------------------------------------------------------------*/
553
554} // End namespace Arcane::MessagePassing
555
556/*---------------------------------------------------------------------------*/
557/*---------------------------------------------------------------------------*/
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Tableau d'items de types quelconques.
Opérations pour accéder aux valeurs de variables d'un autre sous-domaine.
Interface d'une famille d'entités.
Echange d'informations entre processeurs.
virtual Ref< IParallelMngContainer > _createParallelMngBuilder(Int32 nb_local_rank, Parallel::Communicator communicator)=0
Créé un conteneur pour nb_local_rank rangs locaux et avec comme communicateur communicator.
Interface du gestionnaire de parallélisme pour un sous-domaine.
Informations sur la réplication des sous-domaines en parallèle.
Informations sur la topologie d'allocation des coeurs de calcul.
Envoie de valeurs sur différents processeurs.
Interface d'un service de synchronisation de variable.
Groupe d'entités de maillage.
Definition ItemGroup.h:49
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Definition Lima.cc:120
Interface d'une file de messages avec les threads.
Implémentation de IRequestList pour SharedMemoryParallelMng.
void _wait(Parallel::eWaitType wait_type) override
Effectue l'attente ou le test.
Gestionnaire du parallélisme utilisant les threads.
IParallelTopology * createTopology() override
Créé une instance contenant les infos sur la topologie des rangs de ce gestionnnaire.
IGetVariablesValuesParallelOperation * createGetVariablesValuesOperation() override
Retourne une opération pour récupérer les valeurs d'une variable sur les entités d'un autre sous-doma...
Ref< Parallel::IRequestList > createRequestListRef() override
Créé une liste de requêtes pour ce gestionnaire.
void printStats() override
Affiche des statistiques liées à ce gestionnaire du parallélisme.
void initialize() override
Initialise le gestionnaire du parallélisme.
void setReplication(IParallelReplication *v) override
Positionne les Informations sur la réplication.
PointToPointMessageInfo buildMessage(Int32 dest, MP::eBlockingType is_blocking)
Construit un message avec pour destinataire dest.
void waitAllRequests(ArrayView< Request > requests) override
Bloque en attendant que les requêtes rvalues soient terminées.
Ref< IParallelMngUtilsFactory > _internalUtilsFactory() const override
Fabrique des fonctions utilitaires.
IVariableSynchronizer * createSynchronizer(IItemFamily *family) override
Retourne une interface pour synchroniser des variables sur le groupe de la famille family.
MessageId probe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
ISerializeMessage * createSendSerializer(Int32 rank) override
Créé un message non bloquant pour envoyer des données sérialisées au rang rank.
ITraceMng * traceMng() const override
Gestionnaire de traces.
IParallelMng * sequentialParallelMng() override
Retourne un gestionnaire de parallélisme séquentiel.
IParallelExchanger * createExchanger() override
Retourne une interface pour transférer des messages entre processeurs.
void freeRequests(ArrayView< Parallel::Request > requests) override
Libère les requêtes.
ISerializeMessage * createReceiveSerializer(Int32 rank) override
Créé un message non bloquant pour recevoir des données sérialisées du rang rank.
Ref< IParallelMng > createSubParallelMngRef(Int32ConstArrayView kept_ranks) override
Créé un nouveau gestionnaire de parallélisme pour un sous-ensemble des rangs.
ITransferValuesParallelOperation * createTransferValuesOperation() override
Retourne une opération pour transférer des valeurs entre sous-domaine.
MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message) override
Sonde si des messages sont disponibles.
IParallelReplication * replication() const override
Informations sur la réplication.
Redirige la gestion des messages des sous-domaines suivant le type de l'argument.
IMessagePassingMng * messagePassingMng() const override
Gestionnaire de message de Arccore associé
Classe de base d'une fabrique pour les fonctions utilitaires de IParallelMng.
Informations sur la réplication des sous-domaines en parallèle.
virtual void add(const String &name, double elapsed_time, Int64 msg_size)=0
Ajoute une statistique.
virtual void print(ITraceMng *trace)=0
Imprime sur trace les statistiques.
Message utilisant un SerializeBuffer.
Gestionnaire de timer.
Definition TimerMng.h:39
Vue constante d'un tableau de type T.
virtual bool wait()=0
Bloque et attend que tous les threads appellent cette méthode.
Informations sur la source d'un message.
Informations pour envoyer/recevoir un message point à point.
Requête d'un message.
Definition Request.h:77
Classe de base d'une liste de requêtes.
Exception lorsqu'une fonction n'est pas implémentée.
Exception lorsqu'une opération n'est pas supportée.
TraceMessage warning() const
Flot pour un message d'avertissement.
Positionne une classe de message.
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
Definition Parallel.h:94
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
eBlockingType
Type indiquant si un message est bloquant ou non.
Infos pour construire un SharedMemoryParallelMng.