Arcane  v4.1.0.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
SharedMemoryParallelSuperMng.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* SharedMemoryParallelSuperMng.cc (C) 2000-2025 */
9/* */
10/* Gestionnaire de messages utilisant uniquement la mémoire partagée. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/parallel/thread/SharedMemoryParallelSuperMng.h"
15
16#include "arcane/utils/ApplicationInfo.h"
17#include "arcane/utils/PlatformUtils.h"
18#include "arcane/utils/NotImplementedException.h"
19#include "arcane/utils/FatalErrorException.h"
20#include "arcane/utils/ArgumentException.h"
21#include "arcane/utils/ValueConvert.h"
22#include "arcane/utils/IThreadMng.h"
23#include "arcane/utils/TraceClassConfig.h"
24#include "arcane/utils/ITraceMng.h"
25#include "arcane/utils/IThreadImplementation.h"
26#include "arcane/utils/Mutex.h"
27
28#include "arcane/parallel/IStat.h"
29
30#include "arcane/parallel/thread/SharedMemoryParallelMng.h"
31#include "arcane/parallel/thread/SharedMemoryParallelDispatch.h"
32#include "arcane/parallel/thread/SharedMemoryMessageQueue.h"
33#include "arcane/parallel/thread/internal/SharedMemoryThreadMng.h"
34#include "arcane/parallel/thread/internal/SharedMemoryMachineMemoryWindowBaseInternalCreator.h"
35
36#include "arcane/core/FactoryService.h"
37#include "arcane/core/IApplication.h"
38#include "arcane/core/ParallelSuperMngDispatcher.h"
39#include "arcane/core/ApplicationBuildInfo.h"
40#include "arcane/core/AbstractService.h"
41#include "arcane/core/ServiceBuilder.h"
42
43#include "arcane/core/IMainFactory.h"
44
45/*---------------------------------------------------------------------------*/
46/*---------------------------------------------------------------------------*/
47
49{
50
51/*---------------------------------------------------------------------------*/
52/*---------------------------------------------------------------------------*/
56class SharedMemoryParallelMngContainer
58{
59 public:
60 SharedMemoryParallelMngContainer(IApplication* app,Int32 nb_local_rank,
61 MP::Communicator mpi_comm,
63 ~SharedMemoryParallelMngContainer() override;
64
65 public:
66
67 void build();
68 Ref<IParallelMng> _createParallelMng(Int32 local_rank,ITraceMng* tm) override;
69
70 public:
71
73 Int32 m_nb_local_rank;
74 SharedMemoryThreadMng* m_thread_mng;
75 ISharedMemoryMessageQueue* m_message_queue = nullptr;
76 Mutex* m_internal_create_mutex = nullptr;
77 IThreadBarrier* m_thread_barrier = nullptr;
78 SharedMemoryAllDispatcher* m_all_dispatchers = nullptr;
79 IParallelMngContainerFactory* m_sub_factory_builder = nullptr;
81
82 private:
83
84 MP::Communicator m_communicator;
85};
86
87/*---------------------------------------------------------------------------*/
88/*---------------------------------------------------------------------------*/
89
90SharedMemoryParallelMngContainer::
91SharedMemoryParallelMngContainer(IApplication* app,Int32 nb_local_rank,
92 MP::Communicator mpi_comm,
94: m_application(app), m_nb_local_rank(nb_local_rank)
95, m_thread_mng(new SharedMemoryThreadMng())
96, m_sub_factory_builder(factory)
97, m_communicator(mpi_comm)
98{
99}
100
101/*---------------------------------------------------------------------------*/
102/*---------------------------------------------------------------------------*/
103
104SharedMemoryParallelMngContainer::
105~SharedMemoryParallelMngContainer()
106{
107 if (m_thread_barrier)
108 m_thread_barrier->destroy();
109 delete m_message_queue;
110 delete m_thread_mng;
111 delete m_all_dispatchers;
112 delete m_internal_create_mutex;
113 delete m_window_creator;
114}
115
116/*---------------------------------------------------------------------------*/
117/*---------------------------------------------------------------------------*/
118
119void SharedMemoryParallelMngContainer::
120build()
121{
122 m_message_queue = new SharedMemoryMessageQueue();
123 m_message_queue->init(m_nb_local_rank);
124
125 m_thread_barrier = platform::getThreadImplementationService()->createBarrier();
126 m_thread_barrier->init(m_nb_local_rank);
127
128 m_all_dispatchers = new SharedMemoryAllDispatcher();
129 m_all_dispatchers->resize(m_nb_local_rank);
130
131 m_internal_create_mutex = new Mutex();
132
133 m_window_creator = new SharedMemoryMachineMemoryWindowBaseInternalCreator(m_nb_local_rank, m_thread_barrier);
134}
135
136/*---------------------------------------------------------------------------*/
137/*---------------------------------------------------------------------------*/
138
140_createParallelMng(Int32 local_rank,ITraceMng* tm)
141{
142 if (local_rank<0 || local_rank>=m_nb_local_rank)
143 ARCANE_THROW(ArgumentException,"Bad value '{0}' for local_rank (max={1})",
144 local_rank,m_nb_local_rank);
145
146 // Cette méthode n'est pas réentrante.
147 Mutex::ScopedLock sl(m_internal_create_mutex);
148
149 IParallelSuperMng* sm = m_application->sequentialParallelSuperMng();
151 build_info.rank = local_rank;
152 build_info.nb_rank = m_nb_local_rank;
153 build_info.trace_mng = tm;
154 build_info.thread_mng = m_thread_mng;
155 build_info.sequential_parallel_mng = sm->internalCreateWorldParallelMng(0);
156 build_info.world_parallel_mng = nullptr;
157 build_info.message_queue = m_message_queue;
158 build_info.thread_barrier = m_thread_barrier;
159 build_info.all_dispatchers = m_all_dispatchers;
160 build_info.sub_builder_factory = m_sub_factory_builder;
161 build_info.container = makeRef<IParallelMngContainer>(this);
162 build_info.window_creator = m_window_creator;
163 // Seul le rang 0 positionne l'éventuel communicateur sinon tous les PE
164 // vont se retrouver avec le même rang MPI
165 if (local_rank==0)
166 build_info.communicator = m_communicator;
167
168 IParallelMng* pm = new SharedMemoryParallelMng(build_info);
169 pm->build();
170
171 return makeRef(pm);
172}
173
174/*---------------------------------------------------------------------------*/
175/*---------------------------------------------------------------------------*/
176
177class SharedMemoryParallelMngContainerFactory
178: public AbstractService
180{
181 public:
182 SharedMemoryParallelMngContainerFactory(const ServiceBuildInfo& sbi)
183 : AbstractService(sbi), m_application(sbi.application()){}
184 public:
185 Ref<IParallelMngContainer> _createParallelMngBuilder(Int32 nb_rank, MP::Communicator comm, MP::Communicator machine_comm) override
186 {
187 ARCANE_UNUSED(machine_comm);
188 auto x = new SharedMemoryParallelMngContainer(m_application,nb_rank,comm,this);
189 x->build();
191 }
192 private:
193 IApplication* m_application;
194};
195
196/*---------------------------------------------------------------------------*/
197/*---------------------------------------------------------------------------*/
198
200 ServiceProperty("SharedMemoryParallelMngContainerFactory",ST_Application),
202
203/*---------------------------------------------------------------------------*/
204/*---------------------------------------------------------------------------*/
205
206SharedMemoryParallelSuperMng::
207SharedMemoryParallelSuperMng(const ServiceBuildInfo& sbi)
209{
210}
211
212/*---------------------------------------------------------------------------*/
213/*---------------------------------------------------------------------------*/
214
215SharedMemoryParallelSuperMng::
216SharedMemoryParallelSuperMng(const ServiceBuildInfo& sbi,MP::Communicator comm,
217 bool has_mpi_init)
218: m_application(sbi.application())
219, m_stat(nullptr)
220, m_is_parallel(false)
221, m_communicator(comm)
222{
224 m_has_mpi_init = has_mpi_init;
225}
226
227/*---------------------------------------------------------------------------*/
228/*---------------------------------------------------------------------------*/
229
230SharedMemoryParallelSuperMng::
231~SharedMemoryParallelSuperMng()
232{
233 delete m_stat;
234}
235
236/*---------------------------------------------------------------------------*/
237/*---------------------------------------------------------------------------*/
238
243
244/*---------------------------------------------------------------------------*/
245/*---------------------------------------------------------------------------*/
246
248build()
249{
250 if (!arcaneHasThread())
251 ARCANE_FATAL("Can not create SharedMemoryParallelSuperMng because threads are disabled");
252
253 // Si on a été initialisé avec MPI, alors les requêtes nulles et le communicateur
254 // nul ont déjà été positionnés.
255 if (!m_has_mpi_init){
256 Request::setNullRequest(Request(0,nullptr,0));
257 Parallel::Communicator::setNullCommunicator(Parallel::Communicator((void*)nullptr));
258 }
259
260 m_is_parallel = true;
261 Int32 n = m_application->applicationBuildInfo().nbSharedMemorySubDomain();
262 if (n==0)
263 ARCANE_FATAL("Number of shared memory sub-domains is not defined");
264
265 ITraceMng* app_tm = m_application->traceMng();
266 app_tm->info() << "SharedMemoryParallelSuperMng: nb_local_sub_domain=" << n;
267 app_tm->info() << "SharedMemoryParallelSuperMng: mpi_communicator=" << getMPICommunicator();
269 String service_name = "SharedMemoryParallelMngContainerFactory";
270 m_builder_factory = sb.createReference(service_name);
271 Ref<IParallelMngContainer> x = m_builder_factory->_createParallelMngBuilder(n, communicator(), communicator());
272 m_main_builder = x;
273 m_container = dynamic_cast<SharedMemoryParallelMngContainer*>(x.get());
274 ARCANE_CHECK_POINTER(m_container);
275}
276
277/*---------------------------------------------------------------------------*/
278/*---------------------------------------------------------------------------*/
279
282{
283 Int32 max_rank = nbLocalSubDomain();
284 if (local_rank<0 || local_rank>=max_rank)
285 ARCANE_THROW(ArgumentException,"Bad value '{0}' for local_rank (max={1})",
286 local_rank,max_rank);
287
288 ITraceMng* tm = nullptr;
289 ITraceMng* app_tm = m_application->traceMng();
290 if (local_rank==0){
291 // Le premier sous-domaine créé utilise le traceMng() par défaut.
292 tm = app_tm;
293 }
294 else{
295 tm = m_application->createAndInitializeTraceMng(app_tm,String::fromNumber(local_rank));
296 }
297
298 Ref<IParallelMng> pm = m_container->_createParallelMng(local_rank,tm);
299 return pm;
300}
301
302/*---------------------------------------------------------------------------*/
303/*---------------------------------------------------------------------------*/
304
306tryAbort()
307{
308 m_application->traceMng()->flush();
309 ::abort();
310}
311
312/*---------------------------------------------------------------------------*/
313/*---------------------------------------------------------------------------*/
314
316broadcast(ByteArrayView send_buf,Int32 process_id)
317{
318 ARCANE_UNUSED(send_buf);
319 ARCANE_UNUSED(process_id);
320}
321
322/*---------------------------------------------------------------------------*/
323/*---------------------------------------------------------------------------*/
324
326broadcast(Int32ArrayView send_buf,Int32 process_id)
327{
328 ARCANE_UNUSED(send_buf);
329 ARCANE_UNUSED(process_id);
330}
331
332/*---------------------------------------------------------------------------*/
333/*---------------------------------------------------------------------------*/
334
336broadcast(Int64ArrayView send_buf,Int32 process_id)
337{
338 ARCANE_UNUSED(send_buf);
339 ARCANE_UNUSED(process_id);
340}
341
342/*---------------------------------------------------------------------------*/
343/*---------------------------------------------------------------------------*/
344
346broadcast(RealArrayView send_buf,Int32 process_id)
347{
348 ARCANE_UNUSED(send_buf);
349 ARCANE_UNUSED(process_id);
350}
351
352/*---------------------------------------------------------------------------*/
353/*---------------------------------------------------------------------------*/
354
356threadMng() const
357{
358 return m_container->m_thread_mng;
359}
360
361/*---------------------------------------------------------------------------*/
362/*---------------------------------------------------------------------------*/
363
366{
367 return m_container->m_nb_local_rank;
368}
369
370/*---------------------------------------------------------------------------*/
371/*---------------------------------------------------------------------------*/
372
374 ServiceProperty("SharedMemoryParallelSuperMng",ST_Application),
376
377// Ancien nom
379 ServiceProperty("ThreadParallelSuperMng",ST_Application),
381
382/*---------------------------------------------------------------------------*/
383/*---------------------------------------------------------------------------*/
384
385} // End namespace Arcane::MessagePassing
386
387/*---------------------------------------------------------------------------*/
388/*---------------------------------------------------------------------------*/
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
#define ARCANE_SERVICE_INTERFACE(ainterface)
Macro pour déclarer une interface lors de l'enregistrement d'un service.
AbstractService(const ServiceBuildInfo &)
Constructeur à partir d'un ServiceBuildInfo.
Exception lorsqu'un argument est invalide.
Interface de l'application.
Interface d'une fabrique de conteneur de 'IParallelMng'.
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual void build()=0
Construit l'instance.
Classe abstraite du superviseur de parallélisme.
virtual Ref< IParallelMng > internalCreateWorldParallelMng(Int32 local_rank)=0
Créé un gestionnaire de parallélisme pour l'ensemble des coeurs alloués.
virtual void destroy()=0
Détruit la barrière.
virtual void init(Integer nb_thread)=0
Initialise la barrière pour nb_thread.
Interface d'un gestionnaire de thread.
Definition IThreadMng.h:30
Interface du gestionnaire de traces.
virtual TraceMessage info()=0
Flot pour un message d'information.
Communicateur pour l'échange de message.
Interface d'une file de messages avec les threads.
Conteneur des informations du gestionnaire de message en mémoire partagée.
Ref< IParallelMng > _createParallelMng(Int32 local_rank, ITraceMng *tm) override
Créé le IParallelMng pour le rang local local_rank.
Gestionnaire du parallélisme utilisant les threads.
Superviseur du parallélisme utilisant les threads.
Ref< IParallelMng > internalCreateWorldParallelMng(Int32 local_rank) override
Créé un gestionnaire de parallélisme pour l'ensemble des coeurs alloués.
void build() override
Construit les membres l'instance.
void broadcast(ByteArrayView send_buf, Int32 rank) override
Envoie un tableau de valeurs sur tous les processus Cette opération synchronise le tableau de valeur ...
void * getMPICommunicator() override
Adresse du communicateur MPI associé à ce gestionnaire.
Parallel::Communicator communicator() const override
Communicateur MPI associé à ce gestionnaire.
Int32 nbLocalSubDomain() override
Nombre de sous-domaines à créér localement.
IThreadMng * threadMng() const override
Gestionnaire de thread.
Référence à une instance.
InstanceType * get() const
Instance associée ou nullptr si aucune.
IApplication * application() const
Accès à l'application IApplication associé.
Structure contenant les informations pour créer un service.
Classe utilitaire pour instantier un service d'une interface donnée.
Ref< InterfaceType > createReference(const String &name, eServiceBuilderProperties properties=SB_None)
Créé une instance implémentant l'interface InterfaceType.
Propriétés de création d'un service.
Chaîne de caractères unicode.
#define ARCANE_REGISTER_SERVICE(aclass, a_service_property,...)
Macro pour enregistrer un service.
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
IStat * createDefaultStat()
Créé une instance par défaut.
IThreadImplementation * getThreadImplementationService()
Service utilisé pour gérer les threads.
ArrayView< Int64 > Int64ArrayView
Equivalent C d'un tableau à une dimension d'entiers 64 bits.
Definition UtilsTypes.h:467
bool arcaneHasThread()
Vrai si arcane est compilé avec le support des threads ET qu'ils sont actifs.
Definition Misc.cc:99
ArrayView< Byte > ByteArrayView
Equivalent C d'un tableau à une dimension de caractères.
Definition UtilsTypes.h:463
@ ST_Application
Le service s'utilise au niveau de l'application.
ArrayView< Int32 > Int32ArrayView
Equivalent C d'un tableau à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:469
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
ArrayView< Real > RealArrayView
Equivalent C d'un tableau à une dimension de réels.
Definition UtilsTypes.h:475
std::int32_t Int32
Type entier signé sur 32 bits.
Infos pour construire un SharedMemoryParallelMng.