Arcane  v3.14.10.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
ConcurrencyUtils.h
Aller à la documentation de ce fichier.
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2024 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* ConcurrencyUtils.h (C) 2000-2024 */
9/* */
10/* Classes gérant la concurrence (tâches, boucles parallèles, ...) */
11/*---------------------------------------------------------------------------*/
12#ifndef ARCANE_UTILS_CONCURRENCYUTILS_H
13#define ARCANE_UTILS_CONCURRENCYUTILS_H
14/*---------------------------------------------------------------------------*/
15/*---------------------------------------------------------------------------*/
16
18#include "arcane/utils/RangeFunctor.h"
19#include "arcane/utils/FatalErrorException.h"
20#include "arcane/utils/ForLoopTraceInfo.h"
21#include "arcane/utils/ParallelLoopOptions.h"
22
23#include <optional>
24
25/*---------------------------------------------------------------------------*/
26/*---------------------------------------------------------------------------*/
27
28namespace Arcane
29{
30
31/*---------------------------------------------------------------------------*/
32/*---------------------------------------------------------------------------*/
33/*
34 * TODO:
35 * - Vérifier les fuites memoires
36 * - BIEN INDIQUER QU'IL NE FAUT PLUS UTILISER UNE TACHE APRES LE WAIT!!!
37 * - Regarder mecanisme pour les exceptions.
38 * - Surcharger les For et Foreach sans specifier le block_size
39 */
40
41/*---------------------------------------------------------------------------*/
42/*---------------------------------------------------------------------------*/
47class ARCANE_UTILS_EXPORT TaskContext
48{
49 public:
50 explicit TaskContext(ITask* atask) : m_task(atask) {}
51 public:
53 ITask* task() const { return m_task; }
54 private:
55 ITask* m_task;
56};
57
58/*---------------------------------------------------------------------------*/
59/*---------------------------------------------------------------------------*/
65class ARCANE_UTILS_EXPORT ITaskFunctor
66{
67 public:
68 virtual ~ITaskFunctor() = default;
69 protected:
70 ITaskFunctor(const ITaskFunctor&) = default;
71 ITaskFunctor() = default;
72 public:
74 virtual void executeFunctor(const TaskContext& tc) =0;
75 virtual ITaskFunctor* clone(void* buffer,Integer size) =0;
76};
77
78/*---------------------------------------------------------------------------*/
79/*---------------------------------------------------------------------------*/
85template<typename InstanceType>
87: public ITaskFunctor
88{
89 public:
90 typedef void (InstanceType::*FunctorType)();
91 public:
92 TaskFunctor(InstanceType* instance,FunctorType func)
93 : m_instance(instance), m_function(func)
94 {
95 }
96 TaskFunctor(const TaskFunctor& rhs) = default;
97 TaskFunctor& operator=(const TaskFunctor& rhs) = delete;
98 public:
100 void executeFunctor(const TaskContext& /*tc*/) override
101 {
102 (m_instance->*m_function)();
103 }
104 ITaskFunctor* clone(void* buffer,Integer size) override
105 {
106 if (sizeof(*this)>(size_t)size)
107 ARCANE_FATAL("INTERNAL: task functor buffer is too small");
108 return new (buffer) TaskFunctor<InstanceType>(*this);
109 }
110 private:
111 InstanceType* m_instance;
112 FunctorType m_function;
113};
114
115/*---------------------------------------------------------------------------*/
116/*---------------------------------------------------------------------------*/
122template<typename InstanceType>
124: public ITaskFunctor
125{
126 public:
127 typedef void (InstanceType::*FunctorType)(const TaskContext& tc);
128 public:
129 TaskFunctorWithContext(InstanceType* instance,FunctorType func)
130 : ITaskFunctor(), m_instance(instance), m_function(func)
131 {
132 }
133 public:
135 void executeFunctor(const TaskContext& tc) override
136 {
137 (m_instance->*m_function)(tc);
138 }
139 ITaskFunctor* clone(void* buffer,Integer size) override
140 {
141 if (sizeof(*this)>(size_t)size)
142 ARCANE_FATAL("INTERNAL: task functor buffer is too small");
143 return new (buffer) TaskFunctorWithContext<InstanceType>(*this);
144 }
145 private:
146 InstanceType* m_instance;
147 FunctorType m_function;
148};
149
150/*---------------------------------------------------------------------------*/
151/*---------------------------------------------------------------------------*/
158class ARCANE_UTILS_EXPORT ITask
159{
160 friend class TaskFactory;
161
162 public:
163
164 virtual ~ITask(){}
165
166 public:
173 virtual void launchAndWait() =0;
179
180 protected:
181
182 virtual ITask* _createChildTask(ITaskFunctor* functor) =0;
183};
184
185/*---------------------------------------------------------------------------*/
186/*---------------------------------------------------------------------------*/
196class ARCANE_UTILS_EXPORT ITaskImplementation
197{
198 public:
199 virtual ~ITaskImplementation(){}
200 public:
209 virtual void initialize(Int32 nb_thread) =0;
215 virtual void terminate() =0;
222
224 virtual void executeParallelFor(Integer begin,Integer size,const ParallelLoopOptions& options,IRangeFunctor* f) =0;
225
227 virtual void executeParallelFor(Integer begin,Integer size,Integer block_size,IRangeFunctor* f) =0;
228
230 virtual void executeParallelFor(Integer begin,Integer size,IRangeFunctor* f) =0;
231
234
237 const ParallelLoopOptions& options,
238 IMDRangeFunctor<1>* functor) =0;
241 const ParallelLoopOptions& options,
242 IMDRangeFunctor<2>* functor) =0;
245 const ParallelLoopOptions& options,
246 IMDRangeFunctor<3>* functor) =0;
249 const ParallelLoopOptions& options,
250 IMDRangeFunctor<4>* functor) =0;
251
253 virtual bool isActive() const =0;
254
256 virtual Int32 nbAllowedThread() const =0;
257
259 virtual Int32 currentTaskThreadIndex() const =0;
260
262 virtual Int32 currentTaskIndex() const =0;
263
265 virtual void printInfos(std::ostream& o) const =0;
266};
267
268/*---------------------------------------------------------------------------*/
269/*---------------------------------------------------------------------------*/
274class ARCANE_UTILS_EXPORT TaskFactory
275{
276 private:
277 TaskFactory();
278 public:
279 public:
280
286 template<typename InstanceType> static ITask*
287 createTask(InstanceType* instance,void (InstanceType::*function)(const TaskContext& tc))
288 {
289 TaskFunctorWithContext<InstanceType> functor(instance,function);
290 return m_impl->createRootTask(&functor);
291 }
292
298 template<typename InstanceType> static ITask*
299 createTask(InstanceType* instance,void (InstanceType::*function)())
300 {
301 TaskFunctor<InstanceType> functor(instance,function);
302 return m_impl->createRootTask(&functor);
303 }
304
311 template<typename InstanceType> static ITask*
312 createChildTask(ITask* parent_task,InstanceType* instance,void (InstanceType::*function)(const TaskContext& tc))
313 {
315 TaskFunctorWithContext<InstanceType> functor(instance,function);
316 return parent_task->_createChildTask(&functor);
317 }
318
325 template<typename InstanceType> static ITask*
326 createChildTask(ITask* parent_task,InstanceType* instance,void (InstanceType::*function)())
327 {
329 TaskFunctor<InstanceType> functor(instance,function);
330 return parent_task->_createChildTask(&functor);
331 }
332
334 static void executeParallelFor(Integer begin,Integer size,const ParallelLoopOptions& options,IRangeFunctor* f)
335 {
336 m_impl->executeParallelFor(begin,size,options,f);
337 }
338
340 static void executeParallelFor(Integer begin,Integer size,Integer block_size,IRangeFunctor* f)
341 {
342 m_impl->executeParallelFor(begin,size,block_size,f);
343 }
344
346 static void executeParallelFor(Integer begin,Integer size,IRangeFunctor* f)
347 {
348 m_impl->executeParallelFor(begin,size,f);
349 }
350
353 {
354 m_impl->executeParallelFor(loop_info);
355 }
356
359 const ParallelLoopOptions& options,
360 IMDRangeFunctor<1>* functor)
361 {
362 m_impl->executeParallelFor(loop_ranges,options,functor);
363 }
364
367 const ParallelLoopOptions& options,
368 IMDRangeFunctor<2>* functor)
369 {
370 m_impl->executeParallelFor(loop_ranges,options,functor);
371 }
372
375 const ParallelLoopOptions& options,
376 IMDRangeFunctor<3>* functor)
377 {
378 m_impl->executeParallelFor(loop_ranges,options,functor);
379 }
380
383 const ParallelLoopOptions& options,
384 IMDRangeFunctor<4>* functor)
385 {
386 m_impl->executeParallelFor(loop_ranges,options,functor);
387 }
388
390 static Int32 nbAllowedThread()
391 {
392 return m_impl->nbAllowedThread();
393 }
394
403 {
404 return m_impl->currentTaskThreadIndex();
405 }
406
420 static Int32 currentTaskIndex()
421 {
422 return m_impl->currentTaskIndex();
423 }
424
425 public:
426
429 {
430 m_default_loop_options = v;
431 }
432
435 {
436 return m_default_loop_options;
437 }
438
439 public:
440
446 static bool isActive()
447 {
448 return m_impl->isActive();
449 }
450
457 static void printInfos(std::ostream& o)
458 {
459 return m_impl->printInfos(o);
460 }
461
470 static IObservable* createThreadObservable();
471
480 static IObservable* destroyThreadObservable();
481
487 static void terminate();
488
489 public:
490
492 static void setVerboseLevel(Integer v) { m_verbose_level = v; }
493
495 static Integer verboseLevel() { return m_verbose_level; }
496
497 public:
498
500 static void _internalSetImplementation(ITaskImplementation* task_impl);
501
502 private:
503
504 static ITaskImplementation* m_impl;
505 static IObservable* m_created_thread_observable;
506 static IObservable* m_destroyed_thread_observable;
507 static Int32 m_verbose_level;
508 static ParallelLoopOptions m_default_loop_options;
509};
510
511/*---------------------------------------------------------------------------*/
512/*---------------------------------------------------------------------------*/
519class ARCANE_UTILS_EXPORT ForLoopRunInfo
520{
521 public:
522
524
525 public:
526
527 ForLoopRunInfo() = default;
528 explicit ForLoopRunInfo(const ParallelLoopOptions& options)
529 : m_options(options) {}
531 : m_options(options), m_trace_info(trace_info) {}
533 : m_trace_info(trace_info) {}
534
535 public:
536
537 std::optional<ParallelLoopOptions> options() const { return m_options; }
538 ThatClass& addOptions(const ParallelLoopOptions& v) { m_options = v; return (*this); }
539 const ForLoopTraceInfo& traceInfo() const { return m_trace_info; }
540 ThatClass& addTraceInfo(const ForLoopTraceInfo& v) { m_trace_info = v; return (*this); }
541
547 void setExecStat(ForLoopOneExecStat* v) { m_exec_stat = v; }
548
550 ForLoopOneExecStat* execStat() const { return m_exec_stat; }
551
552 protected:
553
554 std::optional<ParallelLoopOptions> m_options;
555 ForLoopTraceInfo m_trace_info;
556 ForLoopOneExecStat* m_exec_stat = nullptr;
557};
558
559/*---------------------------------------------------------------------------*/
560/*---------------------------------------------------------------------------*/
567class ARCANE_UTILS_EXPORT ParallelFor1DLoopInfo
568{
569 public:
570
572
573 public:
574
575 ParallelFor1DLoopInfo(Int32 begin,Int32 size,IRangeFunctor* functor)
576 : m_begin(begin), m_size(size), m_functor(functor) {}
577 ParallelFor1DLoopInfo(Int32 begin,Int32 size,IRangeFunctor* functor,const ForLoopRunInfo& run_info)
578 : m_run_info(run_info), m_begin(begin), m_size(size), m_functor(functor) {}
579 ParallelFor1DLoopInfo(Int32 begin,Int32 size, Int32 block_size,IRangeFunctor* functor)
580 : m_begin(begin), m_size(size), m_functor(functor)
581 {
582 ParallelLoopOptions opts(TaskFactory::defaultParallelLoopOptions());
583 opts.setGrainSize(block_size);
584 m_run_info.addOptions(opts);
585 }
586
587 public:
588
589 Int32 beginIndex() const { return m_begin; }
590 Int32 size() const { return m_size; }
591 IRangeFunctor* functor() const { return m_functor; }
592 ForLoopRunInfo& runInfo() { return m_run_info; }
593 const ForLoopRunInfo& runInfo() const { return m_run_info; }
594
595 private:
596
597 ForLoopRunInfo m_run_info;
598 Int32 m_begin = 0;
599 Int32 m_size = 0;
600 IRangeFunctor* m_functor = nullptr;
601};
602
603/*---------------------------------------------------------------------------*/
604/*---------------------------------------------------------------------------*/
609template<int RankValue,typename LambdaType,typename... ReducerArgs> inline void
611 const ParallelLoopOptions& options,
613 const ReducerArgs&... reducer_args)
614{
615 // Modif Arcane 3.7.9 (septembre 2022)
616 // Effectue une copie pour privatiser au thread courant les valeurs de la lambda.
617 // Cela est nécessaire pour que objets comme les reducers soient bien pris
618 // en compte.
619 // TODO: regarder si on pourrait faire la copie uniquement une fois par thread
620 // si cette copie devient couteuse.
621 // NOTE: A partir de la version 3.12.15 (avril 2024), avec la nouvelle version
622 // des réducteurs (Reduce2), cette privatisation n'est plus utile. Une fois
623 // qu'on aura supprimer les anciennes classes gérant les réductions (Reduce),
624 // on pourra supprimer cette privatisation
626 {
627 using Type = typename std::remove_reference<LambdaType>::type;
630 };
633}
634
635/*---------------------------------------------------------------------------*/
636/*---------------------------------------------------------------------------*/
641template <int RankValue, typename LambdaType, typename... ReducerArgs> inline void
650
651/*---------------------------------------------------------------------------*/
652/*---------------------------------------------------------------------------*/
657template<int RankValue,typename LambdaType> inline void
664
665/*---------------------------------------------------------------------------*/
666/*---------------------------------------------------------------------------*/
671template<int RankValue,typename LambdaType> inline void
679
680/*---------------------------------------------------------------------------*/
681/*---------------------------------------------------------------------------*/
682
683} // End namespace Arcane
684
685/*---------------------------------------------------------------------------*/
686/*---------------------------------------------------------------------------*/
687
688#endif
#define ARCANE_CHECK_POINTER(ptr)
Macro retournant le pointeur ptr s'il est non nul ou lancant une exception s'il est nul.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Déclarations des types utilisés dans Arcane.
Classe pour gérer le profiling d'une seule exécution d'une boucle.
Definition Profiling.h:93
Informations d'exécution d'une boucle.
ForLoopOneExecStat * execStat() const
Pointeur contenant les statistiques d'exécution.
void setExecStat(ForLoopOneExecStat *v)
Positionne le pointeur conservant les statistiques d'exécution.
Informations de trace pour une boucle 'for'.
Interface d'un observable.
Interface d'un fonctor sur un interval d'itération.
Interface d'un fonctor pour une tâche.
virtual void executeFunctor(const TaskContext &tc)=0
Exécute la méthode associé
Implémentation d'une fabrique de tâches.
virtual void executeParallelFor(Integer begin, Integer size, const ParallelLoopOptions &options, IRangeFunctor *f)=0
Exécute le fonctor f en concurrence.
virtual void executeParallelFor(Integer begin, Integer size, IRangeFunctor *f)=0
Exécute le fonctor f en concurrence.
virtual void executeParallelFor(const ParallelFor1DLoopInfo &loop_info)=0
Exécute la boucle loop_info en concurrence.
virtual Int32 nbAllowedThread() const =0
Nombre de threads utilisés au maximum pour gérer les tâches.
virtual void executeParallelFor(Integer begin, Integer size, Integer block_size, IRangeFunctor *f)=0
Exécute le fonctor f en concurrence.
virtual ITask * createRootTask(ITaskFunctor *f)=0
Créé une tâche racine. L'implémentation doit recopier la valeur de f qui est soit un TaskFunctor,...
virtual Int32 currentTaskIndex() const =0
Implémentation de TaskFactory::currentTaskIndex()
virtual void executeParallelFor(const ComplexForLoopRanges< 4 > &loop_ranges, const ParallelLoopOptions &options, IMDRangeFunctor< 4 > *functor)=0
Exécute une boucle 4D en concurrence.
virtual bool isActive() const =0
Indique si l'implémentation est active.
virtual void executeParallelFor(const ComplexForLoopRanges< 1 > &loop_ranges, const ParallelLoopOptions &options, IMDRangeFunctor< 1 > *functor)=0
Exécute une boucle 1D en concurrence.
virtual void executeParallelFor(const ComplexForLoopRanges< 2 > &loop_ranges, const ParallelLoopOptions &options, IMDRangeFunctor< 2 > *functor)=0
Exécute une boucle 2D en concurrence.
virtual void printInfos(std::ostream &o) const =0
Affiche les informations sur le runtime utilisé
virtual void terminate()=0
virtual Int32 currentTaskThreadIndex() const =0
Implémentation de TaskFactory::currentTaskThreadIndex()
virtual void executeParallelFor(const ComplexForLoopRanges< 3 > &loop_ranges, const ParallelLoopOptions &options, IMDRangeFunctor< 3 > *functor)=0
Exécute une boucle 3D en concurrence.
virtual void initialize(Int32 nb_thread)=0
Interface d'une tâche concourante.
virtual void launchAndWait()=0
Lance la tâche et bloque jusqu'à ce qu'elle se termine.
virtual void launchAndWait(ConstArrayView< ITask * > tasks)=0
Lance les tâches filles tasks et bloque jusqu'à ce qu'elles se terminent.
Fonctor sur un interval d'itération instancié via une lambda fonction.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Definition Lima.cc:120
Caractéristiques d'un boucle 1D multi-thread.
Options d'exécution d'une boucle parallèle en multi-thread.
Contexte d'éxecution d'une tâche.
ITask * task() const
Tâche courante.
Fabrique pour les tâches.
static Int32 currentTaskIndex()
Indice (entre 0 et nbAllowedThread()-1) de la tâche actuelle.
static void executeParallelFor(const ParallelFor1DLoopInfo &loop_info)
Exécute la boucle loop_info en concurrence.
static void setVerboseLevel(Integer v)
Positionne le niveau de verbosité (0 pour pas d'affichage qui est le défaut)
static Int32 nbAllowedThread()
Nombre de threads utilisés au maximum pour gérer les tâches.
static void executeParallelFor(Integer begin, Integer size, IRangeFunctor *f)
Exécute le fonctor f en concurrence.
static ITask * createTask(InstanceType *instance, void(InstanceType::*function)())
Créé une tâche. Lors de l'exécution, la tâche appellera la méthode function via l'instance instance.
static const ParallelLoopOptions & defaultParallelLoopOptions()
Valeurs par défaut d'exécution d'une boucle parallèle.
static void executeParallelFor(const ComplexForLoopRanges< 1 > &loop_ranges, const ParallelLoopOptions &options, IMDRangeFunctor< 1 > *functor)
Exécute une boucle simple.
static Integer verboseLevel()
Niveau de verbosité
static void executeParallelFor(const ComplexForLoopRanges< 4 > &loop_ranges, const ParallelLoopOptions &options, IMDRangeFunctor< 4 > *functor)
Exécute une boucle 4D.
static void executeParallelFor(const ComplexForLoopRanges< 2 > &loop_ranges, const ParallelLoopOptions &options, IMDRangeFunctor< 2 > *functor)
Exécute une boucle 2D.
static void setDefaultParallelLoopOptions(const ParallelLoopOptions &v)
Positionne les valeurs par défaut d'exécution d'une boucle parallèle.
static void executeParallelFor(Integer begin, Integer size, const ParallelLoopOptions &options, IRangeFunctor *f)
Exécute le fonctor f en concurrence.
static void executeParallelFor(Integer begin, Integer size, Integer block_size, IRangeFunctor *f)
Exécute le fonctor f en concurrence.
static void executeParallelFor(const ComplexForLoopRanges< 3 > &loop_ranges, const ParallelLoopOptions &options, IMDRangeFunctor< 3 > *functor)
Exécute une boucle 3D.
static ITask * createChildTask(ITask *parent_task, InstanceType *instance, void(InstanceType::*function)(const TaskContext &tc))
Créé une tâche fille.
static Int32 currentTaskThreadIndex()
Indice (entre 0 et nbAllowedThread()-1) du thread exécutant la tâche actuelle.
static ITask * createChildTask(ITask *parent_task, InstanceType *instance, void(InstanceType::*function)())
Créé une tâche fille.
static bool isActive()
Indique si les tâches sont actives. Les tâches sont actives si une implémentation est disponible et s...
static ITask * createTask(InstanceType *instance, void(InstanceType::*function)(const TaskContext &tc))
Créé une tâche. Lors de l'exécution, la tâche appellera la méthode function via l'instance instance.
static void printInfos(std::ostream &o)
Affiche les informations sur l'implémentation.
Fonctor pour une tâche prenant un TaskContext en argument.
void executeFunctor(const TaskContext &tc) override
Exécute la méthode associé
Fonctor sans argument pour une tâche.
void executeFunctor(const TaskContext &) override
Exécute la méthode associé
void arcaneParallelFor(Integer i0, Integer size, InstanceType *itype, void(InstanceType::*lambda_function)(Integer i0, Integer size))
Applique en concurrence la fonction lambda lambda_function sur l'intervalle d'itération [i0,...
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
void arcaneSequentialFor(LoopBoundType< 1, IndexType > bounds, const Lambda &func, ReducerArgs... reducer_args)
Applique le fonctor func sur une boucle 1D.
Type
Type of JSON value.
Definition rapidjson.h:665