Arcane  v3.16.7.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
SharedMemoryParallelSuperMng.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* SharedMemoryParallelSuperMng.cc (C) 2000-2025 */
9/* */
10/* Gestionnaire de messages utilisant uniquement la mémoire partagée. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/parallel/thread/SharedMemoryParallelSuperMng.h"
15
16#include "arcane/utils/ApplicationInfo.h"
17#include "arcane/utils/PlatformUtils.h"
18#include "arcane/utils/NotImplementedException.h"
19#include "arcane/utils/FatalErrorException.h"
20#include "arcane/utils/ArgumentException.h"
21#include "arcane/utils/ValueConvert.h"
22#include "arcane/utils/IThreadMng.h"
23#include "arcane/utils/TraceClassConfig.h"
24#include "arcane/utils/ITraceMng.h"
25#include "arcane/utils/IThreadImplementation.h"
26#include "arcane/utils/Mutex.h"
27
28#include "arcane/parallel/IStat.h"
29
30#include "arcane/parallel/thread/SharedMemoryParallelMng.h"
31#include "arcane/parallel/thread/SharedMemoryParallelDispatch.h"
32#include "arcane/parallel/thread/SharedMemoryMessageQueue.h"
33#include "arcane/parallel/thread/internal/SharedMemoryThreadMng.h"
34#include "arcane/parallel/thread/internal/SharedMemoryMachineMemoryWindowBaseInternalCreator.h"
35
36#include "arcane/core/FactoryService.h"
37#include "arcane/core/IApplication.h"
38#include "arcane/core/ParallelSuperMngDispatcher.h"
39#include "arcane/core/ApplicationBuildInfo.h"
40#include "arcane/core/AbstractService.h"
41#include "arcane/core/ServiceBuilder.h"
42
43#include "arcane/core/IMainFactory.h"
44
45/*---------------------------------------------------------------------------*/
46/*---------------------------------------------------------------------------*/
47
49{
50
51/*---------------------------------------------------------------------------*/
52/*---------------------------------------------------------------------------*/
56class SharedMemoryParallelMngContainer
58{
59 public:
60 SharedMemoryParallelMngContainer(IApplication* app,Int32 nb_local_rank,
61 MP::Communicator mpi_comm,
63 ~SharedMemoryParallelMngContainer() override;
64
65 public:
66
67 void build();
68 Ref<IParallelMng> _createParallelMng(Int32 local_rank,ITraceMng* tm) override;
69
70 public:
71
73 Int32 m_nb_local_rank;
74 SharedMemoryThreadMng* m_thread_mng;
75 ISharedMemoryMessageQueue* m_message_queue = nullptr;
76 Mutex* m_internal_create_mutex = nullptr;
77 IThreadBarrier* m_thread_barrier = nullptr;
78 SharedMemoryAllDispatcher* m_all_dispatchers = nullptr;
79 IParallelMngContainerFactory* m_sub_factory_builder = nullptr;
81
82 private:
83
84 MP::Communicator m_communicator;
85};
86
87/*---------------------------------------------------------------------------*/
88/*---------------------------------------------------------------------------*/
89
90SharedMemoryParallelMngContainer::
91SharedMemoryParallelMngContainer(IApplication* app,Int32 nb_local_rank,
92 MP::Communicator mpi_comm,
94: m_application(app), m_nb_local_rank(nb_local_rank)
95, m_thread_mng(new SharedMemoryThreadMng())
96, m_sub_factory_builder(factory)
97, m_communicator(mpi_comm)
98{
99}
100
101/*---------------------------------------------------------------------------*/
102/*---------------------------------------------------------------------------*/
103
104SharedMemoryParallelMngContainer::
105~SharedMemoryParallelMngContainer()
106{
107 if (m_thread_barrier)
108 m_thread_barrier->destroy();
109 delete m_message_queue;
110 delete m_thread_mng;
111 delete m_all_dispatchers;
112 delete m_internal_create_mutex;
113 delete m_window_creator;
114}
115
116/*---------------------------------------------------------------------------*/
117/*---------------------------------------------------------------------------*/
118
119void SharedMemoryParallelMngContainer::
120build()
121{
122 m_message_queue = new SharedMemoryMessageQueue();
123 m_message_queue->init(m_nb_local_rank);
124
125 m_thread_barrier = platform::getThreadImplementationService()->createBarrier();
126 m_thread_barrier->init(m_nb_local_rank);
127
128 m_all_dispatchers = new SharedMemoryAllDispatcher();
129 m_all_dispatchers->resize(m_nb_local_rank);
130
131 m_internal_create_mutex = new Mutex();
132
133 m_window_creator = new SharedMemoryMachineMemoryWindowBaseInternalCreator(m_nb_local_rank, m_thread_barrier);
134}
135
136/*---------------------------------------------------------------------------*/
137/*---------------------------------------------------------------------------*/
138
140_createParallelMng(Int32 local_rank,ITraceMng* tm)
141{
142 if (local_rank<0 || local_rank>=m_nb_local_rank)
143 ARCANE_THROW(ArgumentException,"Bad value '{0}' for local_rank (max={1})",
144 local_rank,m_nb_local_rank);
145
146 // Cette méthode n'est pas réentrante.
147 Mutex::ScopedLock sl(m_internal_create_mutex);
148
149 IParallelSuperMng* sm = m_application->sequentialParallelSuperMng();
151 build_info.rank = local_rank;
152 build_info.nb_rank = m_nb_local_rank;
153 build_info.trace_mng = tm;
154 build_info.thread_mng = m_thread_mng;
155 build_info.sequential_parallel_mng = sm->internalCreateWorldParallelMng(0);
156 build_info.world_parallel_mng = nullptr;
157 build_info.message_queue = m_message_queue;
158 build_info.thread_barrier = m_thread_barrier;
159 build_info.all_dispatchers = m_all_dispatchers;
160 build_info.sub_builder_factory = m_sub_factory_builder;
161 build_info.container = makeRef<IParallelMngContainer>(this);
162 build_info.window_creator = m_window_creator;
163 // Seul le rang 0 positionne l'éventuel communicateur sinon tous les PE
164 // vont se retrouver avec le même rang MPI
165 if (local_rank==0)
166 build_info.communicator = m_communicator;
167
168 IParallelMng* pm = new SharedMemoryParallelMng(build_info);
169 pm->build();
170
171 return makeRef(pm);
172}
173
174/*---------------------------------------------------------------------------*/
175/*---------------------------------------------------------------------------*/
176
177class SharedMemoryParallelMngContainerFactory
178: public AbstractService
180{
181 public:
182 SharedMemoryParallelMngContainerFactory(const ServiceBuildInfo& sbi)
183 : AbstractService(sbi), m_application(sbi.application()){}
184 public:
185 Ref<IParallelMngContainer> _createParallelMngBuilder(Int32 nb_rank,MP::Communicator comm) override
186 {
187 auto x = new SharedMemoryParallelMngContainer(m_application,nb_rank,comm,this);
188 x->build();
190 }
191 private:
192 IApplication* m_application;
193};
194
195/*---------------------------------------------------------------------------*/
196/*---------------------------------------------------------------------------*/
197
199 ServiceProperty("SharedMemoryParallelMngContainerFactory",ST_Application),
201
202/*---------------------------------------------------------------------------*/
203/*---------------------------------------------------------------------------*/
204
205SharedMemoryParallelSuperMng::
206SharedMemoryParallelSuperMng(const ServiceBuildInfo& sbi)
208{
209}
210
211/*---------------------------------------------------------------------------*/
212/*---------------------------------------------------------------------------*/
213
214SharedMemoryParallelSuperMng::
215SharedMemoryParallelSuperMng(const ServiceBuildInfo& sbi,MP::Communicator comm,
216 bool has_mpi_init)
217: m_application(sbi.application())
218, m_stat(nullptr)
219, m_is_parallel(false)
220, m_communicator(comm)
221{
223 m_has_mpi_init = has_mpi_init;
224}
225
226/*---------------------------------------------------------------------------*/
227/*---------------------------------------------------------------------------*/
228
229SharedMemoryParallelSuperMng::
230~SharedMemoryParallelSuperMng()
231{
232 delete m_stat;
233}
234
235/*---------------------------------------------------------------------------*/
236/*---------------------------------------------------------------------------*/
237
242
243/*---------------------------------------------------------------------------*/
244/*---------------------------------------------------------------------------*/
245
247build()
248{
249 if (!arcaneHasThread())
250 ARCANE_FATAL("Can not create SharedMemoryParallelSuperMng because threads are disabled");
251
252 // Si on a été initialisé avec MPI, alors les requêtes nulles et le communicateur
253 // nul ont déjà été positionnés.
254 if (!m_has_mpi_init){
255 Request::setNullRequest(Request(0,nullptr,0));
256 Parallel::Communicator::setNullCommunicator(Parallel::Communicator((void*)nullptr));
257 }
258
259 m_is_parallel = true;
260 Int32 n = m_application->applicationBuildInfo().nbSharedMemorySubDomain();
261 if (n==0)
262 ARCANE_FATAL("Number of shared memory sub-domains is not defined");
263
264 ITraceMng* app_tm = m_application->traceMng();
265 app_tm->info() << "SharedMemoryParallelSuperMng: nb_local_sub_domain=" << n;
266 app_tm->info() << "SharedMemoryParallelSuperMng: mpi_communicator=" << getMPICommunicator();
268 String service_name = "SharedMemoryParallelMngContainerFactory";
269 m_builder_factory = sb.createReference(service_name);
270 Ref<IParallelMngContainer> x = m_builder_factory->_createParallelMngBuilder(n,communicator());
271 m_main_builder = x;
272 m_container = dynamic_cast<SharedMemoryParallelMngContainer*>(x.get());
273 ARCANE_CHECK_POINTER(m_container);
274}
275
276/*---------------------------------------------------------------------------*/
277/*---------------------------------------------------------------------------*/
278
281{
282 Int32 max_rank = nbLocalSubDomain();
283 if (local_rank<0 || local_rank>=max_rank)
284 ARCANE_THROW(ArgumentException,"Bad value '{0}' for local_rank (max={1})",
285 local_rank,max_rank);
286
287 ITraceMng* tm = nullptr;
288 ITraceMng* app_tm = m_application->traceMng();
289 if (local_rank==0){
290 // Le premier sous-domaine créé utilise le traceMng() par défaut.
291 tm = app_tm;
292 }
293 else{
294 tm = m_application->createAndInitializeTraceMng(app_tm,String::fromNumber(local_rank));
295 }
296
297 Ref<IParallelMng> pm = m_container->_createParallelMng(local_rank,tm);
298 return pm;
299}
300
301/*---------------------------------------------------------------------------*/
302/*---------------------------------------------------------------------------*/
303
305tryAbort()
306{
307 m_application->traceMng()->flush();
308 ::abort();
309}
310
311/*---------------------------------------------------------------------------*/
312/*---------------------------------------------------------------------------*/
313
315broadcast(ByteArrayView send_buf,Int32 process_id)
316{
317 ARCANE_UNUSED(send_buf);
318 ARCANE_UNUSED(process_id);
319}
320
321/*---------------------------------------------------------------------------*/
322/*---------------------------------------------------------------------------*/
323
325broadcast(Int32ArrayView send_buf,Int32 process_id)
326{
327 ARCANE_UNUSED(send_buf);
328 ARCANE_UNUSED(process_id);
329}
330
331/*---------------------------------------------------------------------------*/
332/*---------------------------------------------------------------------------*/
333
335broadcast(Int64ArrayView send_buf,Int32 process_id)
336{
337 ARCANE_UNUSED(send_buf);
338 ARCANE_UNUSED(process_id);
339}
340
341/*---------------------------------------------------------------------------*/
342/*---------------------------------------------------------------------------*/
343
345broadcast(RealArrayView send_buf,Int32 process_id)
346{
347 ARCANE_UNUSED(send_buf);
348 ARCANE_UNUSED(process_id);
349}
350
351/*---------------------------------------------------------------------------*/
352/*---------------------------------------------------------------------------*/
353
355threadMng() const
356{
357 return m_container->m_thread_mng;
358}
359
360/*---------------------------------------------------------------------------*/
361/*---------------------------------------------------------------------------*/
362
365{
366 return m_container->m_nb_local_rank;
367}
368
369/*---------------------------------------------------------------------------*/
370/*---------------------------------------------------------------------------*/
371
373 ServiceProperty("SharedMemoryParallelSuperMng",ST_Application),
375
376// Ancien nom
378 ServiceProperty("ThreadParallelSuperMng",ST_Application),
380
381/*---------------------------------------------------------------------------*/
382/*---------------------------------------------------------------------------*/
383
384} // End namespace Arcane::MessagePassing
385
386/*---------------------------------------------------------------------------*/
387/*---------------------------------------------------------------------------*/
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
#define ARCANE_SERVICE_INTERFACE(ainterface)
Macro pour déclarer une interface lors de l'enregistrement d'un service.
AbstractService(const ServiceBuildInfo &)
Constructeur à partir d'un ServiceBuildInfo.
Exception lorsqu'un argument est invalide.
Interface de l'application.
Interface d'une fabrique de conteneur de 'IParallelMng'.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual void build()=0
Construit l'instance.
Classe abstraite du superviseur de parallélisme.
virtual Ref< IParallelMng > internalCreateWorldParallelMng(Int32 local_rank)=0
Créé un gestionnaire de parallélisme pour l'ensemble des coeurs alloués.
virtual void destroy()=0
Détruit la barrière.
virtual void init(Integer nb_thread)=0
Initialise la barrière pour nb_thread.
Interface d'un gestionnaire de thread.
Definition IThreadMng.h:30
Interface du gestionnaire de traces.
virtual TraceMessage info()=0
Flot pour un message d'information.
Communicateur pour l'échange de message.
Interface d'une file de messages avec les threads.
Conteneur des informations du gestionnaire de message en mémoire partagée.
Ref< IParallelMng > _createParallelMng(Int32 local_rank, ITraceMng *tm) override
Créé le IParallelMng pour le rang local local_rank.
Gestionnaire du parallélisme utilisant les threads.
Superviseur du parallélisme utilisant les threads.
Ref< IParallelMng > internalCreateWorldParallelMng(Int32 local_rank) override
Créé un gestionnaire de parallélisme pour l'ensemble des coeurs alloués.
void build() override
Construit les membres l'instance.
void broadcast(ByteArrayView send_buf, Int32 rank) override
Envoie un tableau de valeurs sur tous les processus Cette opération synchronise le tableau de valeur ...
void * getMPICommunicator() override
Adresse du communicateur MPI associé à ce gestionnaire.
Parallel::Communicator communicator() const override
Communicateur MPI associé à ce gestionnaire.
Int32 nbLocalSubDomain() override
Nombre de sous-domaines à créér localement.
IThreadMng * threadMng() const override
Gestionnaire de thread.
Référence à une instance.
InstanceType * get() const
Instance associée ou nullptr si aucune.
IApplication * application() const
Accès à l'application IApplication associé.
Structure contenant les informations pour créer un service.
Classe utilitaire pour instantier un service d'une interface donnée.
Ref< InterfaceType > createReference(const String &name, eServiceBuilderProperties properties=SB_None)
Créé une instance implémentant l'interface InterfaceType.
Propriétés de création d'un service.
Chaîne de caractères unicode.
#define ARCANE_REGISTER_SERVICE(aclass, a_service_property,...)
Macro pour enregistrer un service.
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
IStat * createDefaultStat()
Créé une instance par défaut.
IThreadImplementation * getThreadImplementationService()
Service utilisé pour gérer les threads.
ArrayView< Int64 > Int64ArrayView
Equivalent C d'un tableau à une dimension d'entiers 64 bits.
Definition UtilsTypes.h:538
bool arcaneHasThread()
Vrai si arcane est compilé avec le support des threads ET qu'ils sont actifs.
Definition Misc.cc:99
ArrayView< Byte > ByteArrayView
Equivalent C d'un tableau à une dimension de caractères.
Definition UtilsTypes.h:534
@ ST_Application
Le service s'utilise au niveau de l'application.
ArrayView< Int32 > Int32ArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:540
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
ArrayView< Real > RealArrayView
Equivalent C d'un tableau à une dimension de réels.
Definition UtilsTypes.h:546
std::int32_t Int32
Type entier signé sur 32 bits.
Infos pour construire un SharedMemoryParallelMng.