14#include "arcane/utils/IThreadImplementation.h"
15#include "arcane/utils/NotImplementedException.h"
16#include "arcane/utils/IFunctor.h"
17#include "arcane/utils/CheckedConvert.h"
18#include "arcane/utils/ForLoopRanges.h"
20#include "arcane/utils/IObservable.h"
21#include "arcane/utils/PlatformUtils.h"
22#include "arcane/utils/Profiling.h"
23#include "arcane/utils/MemoryAllocator.h"
25#include "arcane/FactoryService.h"
32#define TBB_PREVIEW_BLOCKED_RANGE_ND 1
39#ifdef ARCANE_USE_ONETBB
42#define TBB_PREVIEW_WAITING_FOR_WORKERS 1
44#include <oneapi/tbb/concurrent_set.h>
45#include <oneapi/tbb/global_control.h>
51#include <tbb/blocked_rangeNd.h>
55#if (TBB_VERSION_MAJOR < 4) || (TBB_VERSION_MAJOR==4 && TBB_VERSION_MINOR<2)
59#if (TBB_VERSION_MAJOR<2018)
64#error "Your version of TBB is tool old. TBB 2018+ is required. Please disable TBB in configuration"
78class TBBTaskImplementation;
105 explicit ScopedExecInfo(
const ForLoopRunInfo& run_info)
106 : m_run_info(run_info)
112 ForLoopOneExecStat* ptr = run_info.execStat();
114 m_stat_info_ptr = ptr;
115 m_use_own_run_info =
false;
118 m_stat_info_ptr = isStatActive() ? &m_stat_info :
nullptr;
122 if (m_stat_info_ptr && m_use_own_run_info)
128 ForLoopOneExecStat* statInfo()
const {
return m_stat_info_ptr; }
129 bool isOwn()
const {
return m_use_own_run_info; }
132 ForLoopOneExecStat m_stat_info;
133 ForLoopOneExecStat* m_stat_info_ptr =
nullptr;
134 ForLoopRunInfo m_run_info;
136 bool m_use_own_run_info =
true;
142inline int _currentTaskTreadIndex()
148 return tbb::this_task_arena::current_thread_index();
151inline tbb::blocked_rangeNd<Int32,1>
152_toTBBRange(
const ComplexForLoopRanges<1>& r)
154 return {{r.lowerBound<0>(), r.upperBound<0>()}};
157inline tbb::blocked_rangeNd<Int32,2>
158_toTBBRange(
const ComplexForLoopRanges<2>& r)
160 return {{r.lowerBound<0>(), r.upperBound<0>()},
161 {r.lowerBound<1>(), r.upperBound<1>()}};
165inline tbb::blocked_rangeNd<Int32,3>
166_toTBBRange(
const ComplexForLoopRanges<3>& r)
168 return {{r.lowerBound<0>(), r.upperBound<0>()},
169 {r.lowerBound<1>(), r.upperBound<1>()},
170 {r.lowerBound<2>(), r.upperBound<2>()}};
173inline tbb::blocked_rangeNd<Int32,4>
174_toTBBRange(
const ComplexForLoopRanges<4>& r)
176 return {{r.lowerBound<0>(), r.upperBound<0>()},
177 {r.lowerBound<1>(), r.upperBound<1>()},
178 {r.lowerBound<2>(), r.upperBound<2>()},
179 {r.lowerBound<3>(), r.upperBound<3>()}};
185inline tbb::blocked_rangeNd<Int32,2>
186_toTBBRangeWithGrain(
const tbb::blocked_rangeNd<Int32,2>& r,std::size_t grain_size)
188 return {{r.dim(0).begin(), r.dim(0).end(), grain_size},
189 {r.dim(1).begin(), r.dim(1).end()}};
192inline tbb::blocked_rangeNd<Int32,3>
193_toTBBRangeWithGrain(
const tbb::blocked_rangeNd<Int32,3>& r,std::size_t grain_size)
195 return {{r.dim(0).begin(), r.dim(0).end(), grain_size},
196 {r.dim(1).begin(), r.dim(0).end()},
197 {r.dim(2).begin(), r.dim(0).end()}};
200inline tbb::blocked_rangeNd<Int32,4>
201_toTBBRangeWithGrain(
const tbb::blocked_rangeNd<Int32,4>& r,std::size_t grain_size)
203 return {{r.dim(0).begin(), r.dim(0).end(), grain_size},
204 {r.dim(1).begin(), r.dim(1).end()},
205 {r.dim(2).begin(), r.dim(2).end()},
206 {r.dim(3).begin(), r.dim(3).end()}};
212inline ComplexForLoopRanges<2>
213_fromTBBRange(
const tbb::blocked_rangeNd<Int32,2>& r)
215 using BoundsType = ArrayBounds<MDDim2>;
216 using ArrayExtentType =
typename BoundsType::ArrayExtentType;
218 BoundsType lower_bounds(ArrayExtentType(r.dim(0).begin(),r.dim(1).begin()));
219 auto s0 =
static_cast<Int32>(r.dim(0).size());
220 auto s1 =
static_cast<Int32>(r.dim(1).size());
221 BoundsType sizes(ArrayExtentType(s0,s1));
222 return { lower_bounds, sizes };
225inline ComplexForLoopRanges<3>
226_fromTBBRange(
const tbb::blocked_rangeNd<Int32,3> & r)
228 using BoundsType = ArrayBounds<MDDim3>;
229 using ArrayExtentType =
typename BoundsType::ArrayExtentType;
231 BoundsType lower_bounds(ArrayExtentType(r.dim(0).begin(),r.dim(1).begin(),r.dim(2).begin()));
232 auto s0 =
static_cast<Int32>(r.dim(0).size());
233 auto s1 =
static_cast<Int32>(r.dim(1).size());
234 auto s2 =
static_cast<Int32>(r.dim(2).size());
235 BoundsType sizes(ArrayExtentType(s0,s1,s2));
236 return { lower_bounds, sizes };
239inline ComplexForLoopRanges<4>
240_fromTBBRange(
const tbb::blocked_rangeNd<Int32,4>& r)
242 using BoundsType = ArrayBounds<MDDim4>;
243 using ArrayExtentType =
typename BoundsType::ArrayExtentType;
245 BoundsType lower_bounds(ArrayExtentType(r.dim(0).begin(),r.dim(1).begin(),r.dim(2).begin(),r.dim(3).begin()));
246 auto s0 =
static_cast<Int32>(r.dim(0).size());
247 auto s1 =
static_cast<Int32>(r.dim(1).size());
248 auto s2 =
static_cast<Int32>(r.dim(2).size());
249 auto s3 =
static_cast<Int32>(r.dim(3).size());
250 BoundsType sizes(ArrayExtentType(s0,s1,s2,s3));
251 return { lower_bounds, sizes };
259#ifdef ARCANE_USE_ONETBB
264class OneTBBTaskFunctor
267 OneTBBTaskFunctor(ITaskFunctor* functor,ITask* task)
268 : m_functor(functor), m_task(task) {}
270 void operator()()
const
273 ITaskFunctor* tf = m_functor;
275 TaskContext task_context(m_task);
277 tf->executeFunctor(task_context);
281 mutable ITaskFunctor* m_functor;
292 static const int FUNCTOR_CLASS_SIZE = 32;
294 OneTBBTask(ITaskFunctor* f)
297 m_functor = f->clone(functor_buf,FUNCTOR_CLASS_SIZE);
300 OneTBBTaskFunctor taskFunctor() {
return OneTBBTaskFunctor(m_functor,
this); }
301 void launchAndWait()
override;
302 void launchAndWait(ConstArrayView<ITask*> tasks)
override;
304 virtual ITask* _createChildTask(ITaskFunctor* functor)
override;
306 ITaskFunctor* m_functor;
307 char functor_buf[FUNCTOR_CLASS_SIZE];
309using TBBTask = OneTBBTask;
324 static const int FUNCTOR_CLASS_SIZE = 32;
329 m_functor = f->clone(functor_buf,FUNCTOR_CLASS_SIZE);
332 tbb::task* execute()
override
349 char functor_buf[FUNCTOR_CLASS_SIZE];
370 template<
int RankValue>
381 void setTaskIndex(Integer v) { m_task_index = v; }
382 Integer taskIndex()
const {
return m_task_index; }
384 Integer m_task_index;
397 : m_tti(
tti), m_old_task_index(-1)
400 m_old_task_index =
tti->taskIndex();
407 m_tti->setTaskIndex(m_old_task_index);
411 Integer m_old_task_index;
428#ifdef ARCANE_USE_ONETBB
438 void executeParallelFor(Int32 begin,Int32 size,
IRangeFunctor* f)
final
442 void executeParallelFor(
const ParallelFor1DLoopInfo& loop_info)
override;
515 :
public tbb::task_scheduler_observer
520#ifdef ARCANE_USE_ONETBB
521 tbb::task_scheduler_observer(
p->m_main_arena),
526 void on_scheduler_entry(
bool is_worker)
override
530 void on_scheduler_exit(
bool is_worker)
override
539 m_task_observer(this),
542#ifdef ARCANE_USE_ONETBB
543 m_nb_allowed_thread = tbb::info::default_concurrency();
545 m_nb_allowed_thread = tbb::task_scheduler_init::default_num_threads();
555 m_task_observer(this),
563 TaskThreadInfo* threadTaskInfo(Integer index) {
return &m_thread_task_infos[index]; }
565 Int32 m_nb_allowed_thread;
570 for(
auto x : m_sub_arena_list ){
575 m_sub_arena_list.clear();
576 m_main_arena.terminate();
577#ifdef ARCANE_USE_ONETBB
578 m_task_observer.observe(
false);
579 oneapi::tbb::finalize(m_task_scheduler_handle);
581 m_scheduler_init.terminate();
582 m_task_observer.observe(
false);
586 void notifyThreadCreated(
bool is_worker)
588 std::thread::id my_thread_id = std::this_thread::get_id();
590#ifdef ARCANE_USE_ONETBB
597 if (m_constructed_thread_map.contains(my_thread_id))
599 m_constructed_thread_map.insert(my_thread_id);
606 std::scoped_lock sl(m_thread_created_mutex);
608 std::cout <<
"TBB: CREATE THREAD"
609 <<
" nb_allowed=" << m_nb_allowed_thread
610#ifdef ARCANE_USE_ONETBB
611 <<
" tbb_default_allowed=" << tbb::info::default_concurrency()
613 <<
" tbb_default_allowed=" << tbb::task_scheduler_init::default_num_threads()
615 <<
" id=" << my_thread_id
616 <<
" arena_id=" << _currentTaskTreadIndex()
617 <<
" is_worker=" << is_worker
624 void notifyThreadDestroyed([[maybe_unused]]
bool is_worker)
626#ifdef ARCANE_USE_ONETBB
636 std::scoped_lock sl(m_thread_created_mutex);
638 std::cout <<
"TBB: DESTROY THREAD"
639 <<
" id=" << std::this_thread::get_id()
640 <<
" arena_id=" << _currentTaskTreadIndex()
641 <<
" is_worker=" << is_worker
648#ifdef ARCANE_USE_ONETBB
649#if TBB_VERSION_MAJOR>2021 || (TBB_VERSION_MAJOR==2021 && TBB_VERSION_MINOR>5)
650 oneapi::tbb::task_scheduler_handle m_task_scheduler_handle = oneapi::tbb::attach();
652 oneapi::tbb::task_scheduler_handle m_task_scheduler_handle = tbb::task_scheduler_handle::get();
655 tbb::task_scheduler_init m_scheduler_init;
658 tbb::task_arena m_main_arena;
663 std::mutex m_thread_created_mutex;
665#ifdef ARCANE_USE_ONETBB
671 std::cout <<
"TBB: TBBTaskImplementationInit nb_allowed_thread=" << m_nb_allowed_thread
672 <<
" id=" << std::this_thread::get_id()
676 m_thread_task_infos.resize(m_nb_allowed_thread);
677 m_task_observer.observe(
true);
684 m_sub_arena_list[0] = m_sub_arena_list[1] =
nullptr;
686 m_sub_arena_list[i] =
new tbb::task_arena(i);
702 void operator()(tbb::blocked_range<Integer>&
range)
const
705 if (TaskFactory::verboseLevel()>=3){
706 std::ostringstream o;
707 o <<
"TBB: INDEX=" << TaskFactory::currentTaskThreadIndex()
708 <<
" id=" << std::this_thread::get_id()
709 <<
" max_allowed=" << m_nb_allowed_thread
710 <<
" range_begin=" <<
range.begin() <<
" range_size=" <<
range.size()
712 std::cout << o.str();
718 ARCANE_FATAL(
"Invalid index for thread idx={0} valid_interval=[0..{1}[",
723 m_stat_info->incrementNbChunk();
724 m_functor->executeFunctor(
range.begin(),CheckedConvert::toInteger(
range.size()));
730 Int32 m_nb_allowed_thread;
738template<
int RankValue>
748 void operator()(tbb::blocked_rangeNd<Int32,RankValue>&
range)
const
751 if (TaskFactory::verboseLevel()>=3){
752 std::ostringstream o;
753 o <<
"TBB: INDEX=" << TaskFactory::currentTaskThreadIndex()
754 <<
" id=" << std::this_thread::get_id()
755 <<
" max_allowed=" << m_nb_allowed_thread
758 std::cout << o.str();
764 ARCANE_FATAL(
"Invalid index for thread idx={0} valid_interval=[0..{1}[",
769 m_stat_info->incrementNbChunk();
777 Int32 m_nb_allowed_thread;
805 m_grain_size(
grain_size), m_nb_block(0), m_block_size(0), m_nb_block_per_thread(0)
811 m_block_size = m_grain_size;
813 m_nb_block = m_size / m_block_size;
814 if ((m_size % m_block_size)!=0)
819 m_nb_block_per_thread = m_nb_block / m_nb_thread;
820 if ((m_nb_block % m_nb_thread) != 0)
821 ++m_nb_block_per_thread;
825 m_nb_block = m_nb_thread;
826 m_block_size = m_size / m_nb_block;
827 m_nb_block_per_thread = 1;
829 if (TaskFactory::verboseLevel()>=2){
830 std::cout <<
"TBBDeterministicParallelFor: BEGIN=" << m_begin_index <<
" size=" << m_size
831 <<
" grain_size=" << m_grain_size
832 <<
" nb_block=" << m_nb_block <<
" nb_thread=" << m_nb_thread
833 <<
" nb_block_per_thread=" << m_nb_block_per_thread
834 <<
" block_size=" << m_block_size
835 <<
" block_size*nb_block=" << m_block_size*m_nb_block <<
'\n';
849 for( Integer i=0; i<
nb_iter; ++i ){
851 for ( Integer
k=0,
kn=m_nb_block_per_thread;
k<
kn; ++
k ){
869 iter_begin += m_begin_index;
871 if (TaskFactory::verboseLevel()>=3){
872 std::ostringstream o;
873 o <<
"TBB: DoBlock: BLOCK task_id=" << task_id <<
" block_id=" << block_id
874 <<
" iter_begin=" << iter_begin <<
" iter_size=" << iter_size <<
'\n';
875 std::cout << o.str();
880 auto r = tbb::blocked_range<int>(iter_begin,iter_begin + iter_size);
887 TBBTaskImplementation* m_impl;
888 const TBBParallelFor& m_tbb_for;
907 : m_impl(impl), m_begin(begin), m_size(size), m_functor(f), m_options(options), m_stat_info(
stat_info){}
911 void operator()()
const
913 Integer
nb_thread = m_options.maxThread();
915 Integer
gsize = m_options.grainSize();
916 tbb::blocked_range<Integer>
range(m_begin,m_begin+m_size);
917 if (TaskFactory::verboseLevel()>=1)
918 std::cout <<
"TBB: TBBTaskImplementationInit ParallelForExecute begin=" << m_begin
919 <<
" size=" << m_size <<
" gsize=" <<
gsize
920 <<
" partitioner=" << (
int)m_options.partitioner()
924 range = tbb::blocked_range<Integer>(m_begin,m_begin+m_size,
gsize);
926 if (m_options.partitioner()==ParallelLoopOptions::Partitioner::Static){
927 tbb::parallel_for(
range,
pf,tbb::static_partitioner());
929 else if (m_options.partitioner()==ParallelLoopOptions::Partitioner::Deterministic){
949template<
int RankValue>
958 : m_impl(impl), m_tbb_range(
_toTBBRange(
range)), m_functor(f), m_options(options)
963 Integer
gsize = m_options.grainSize();
973 void operator()()
const
975 Integer
nb_thread = m_options.maxThread();
978 if (m_options.partitioner()==ParallelLoopOptions::Partitioner::Static){
979 tbb::parallel_for(m_tbb_range,
pf,tbb::static_partitioner());
981 else if (m_options.partitioner()==ParallelLoopOptions::Partitioner::Deterministic){
989 tbb::parallel_for(m_tbb_range,
pf);
993 tbb::blocked_rangeNd<Int32,RankValue> m_tbb_range;
1002TBBTaskImplementation::
1003~TBBTaskImplementation()
1041 return m_p->nbAllowedThread();
1050#ifdef ARCANE_USE_ONETBB
1051 o <<
"OneTBBTaskImplementation"
1056 o <<
"TBBTaskImplementation"
1065void TBBTaskImplementation::
1083 std::cout <<
"TBB: TBBTaskImplementation executeParallelFor begin=" << begin
1084 <<
" size=" << size <<
" max_thread=" <<
max_thread
1085 <<
" grain_size=" << options.
grainSize()
1095 ParallelLoopOptions true_options(options);
1097 true_options.setMaxThread(max_thread);
1099 ParallelForExecute pfe(
this,true_options,begin,size,f,stat_info);
1101 tbb::task_arena* used_arena =
nullptr;
1102 if (max_thread<nb_allowed_thread)
1103 used_arena = m_p->m_sub_arena_list[max_thread];
1105 used_arena = &(m_p->m_main_arena);
1106 used_arena->execute(pfe);
1112void TBBTaskImplementation::
1135 std::cout <<
"TBB: TBBTaskImplementation executeMDParallelFor nb_dim=" <<
RankValue <<
'\n';
1159 auto x1 = [&](Integer begin,Integer size)
1179void TBBTaskImplementation::
1191void TBBTaskImplementation::
1192executeParallelFor(Integer begin,Integer size,
const ParallelLoopOptions& options,IRangeFunctor* f)
1194 executeParallelFor(ParallelFor1DLoopInfo(begin,size,f,ForLoopRunInfo(options)));
1216#ifdef ARCANE_USE_ONETBB
1232#ifdef ARCANE_USE_ONETBB
1250launchAndWait(ConstArrayView<ITask*> tasks)
1252 tbb::task_group task_group;
1253 Integer n = tasks.size();
1258 for( Integer i=0; i<n; ++i ){
1259 OneTBBTask* t =
static_cast<OneTBBTask*
>(tasks[i]);
1260 task_group.run(t->taskFunctor());
1263 for( Integer i=0; i<n; ++i ){
1264 OneTBBTask* t =
static_cast<OneTBBTask*
>(tasks[i]);
1273_createChildTask(ITaskFunctor* functor)
1275 OneTBBTask* t =
new OneTBBTask(functor);
1290 task::spawn_root_and_wait(*
this);
1299 Integer n =
tasks.size();
1304 for( Integer i=0; i<n-1; ++i ){
1314ITask* LegacyTBBTask::
1330 TBBTaskImplementation);
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
#define ARCANE_ALIGNAS_PACKED(value)
Macro pour garantir le compactage et l'alignement d'une classe sur value octets.
Classes, Types et macros pour gérer la concurrence.
#define ARCANE_REGISTER_APPLICATION_FACTORY(aclass, ainterface, aname)
Enregistre un service de fabrique pour la classe aclass.
Interval d'itération complexe.
Classe pour gérer le profiling d'une seule exécution d'une boucle.
Intervalle d'itération pour une boucle.
Informations d'exécution d'une boucle.
Interface d'un fonctor sur un interval d'itération multi-dimensionnel de dimension RankValue.
Interface d'un fonctor sur un interval d'itération.
virtual void executeFunctor(Integer begin, Integer size)=0
Exécute la méthode associée.
Interface d'un fonctor pour une tâche.
Implémentation d'une fabrique de tâches.
Interface d'une tâche concourante.
Fonctor sur un interval d'itération instancié via une lambda fonction.
void launchAndWait() override
Lance la tâche et bloque jusqu'à ce qu'elle se termine.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Caractéristiques d'un boucle 1D multi-thread.
Options d'exécution d'une boucle parallèle en multi-thread.
Integer grainSize() const
Taille d'un intervalle d'itération.
Int32 maxThread() const
Nombre maximal de threads autorisés.
static impl::ForLoopStatInfoList * _threadLocalForLoopInstance()
static bool hasProfiling()
Indique si le profilage est actif.
Structure contenant les informations pour créer un service.
Implémentation déterministe de ParallelFor.
void operator()(tbb::blocked_range< Integer > &range) const
Opérateur pour un thread donné.
Exécuteur pour une boucle multi-dimension.
Exécuteur pour une boucle 1D.
std::vector< tbb::task_arena * > m_sub_arena_list
Tableau dont le i-ème élément contient la tbb::task_arena pour i thread.
Classe pour positionner TaskThreadInfo::taskIndex().
void executeParallelFor(const ComplexForLoopRanges< 1 > &loop_ranges, const ParallelLoopOptions &options, IMDRangeFunctor< 1 > *functor) final
Exécute une boucle 1D en concurrence.
Int32 currentTaskThreadIndex() const final
Implémentation de TaskFactory::currentTaskThreadIndex()
void initialize(Int32 nb_thread) override
void executeParallelFor(const ComplexForLoopRanges< 4 > &loop_ranges, const ParallelLoopOptions &options, IMDRangeFunctor< 4 > *functor) final
Exécute une boucle 4D en concurrence.
void executeParallelFor(const ComplexForLoopRanges< 3 > &loop_ranges, const ParallelLoopOptions &options, IMDRangeFunctor< 3 > *functor) final
Exécute une boucle 3D en concurrence.
void executeParallelFor(const ComplexForLoopRanges< 2 > &loop_ranges, const ParallelLoopOptions &options, IMDRangeFunctor< 2 > *functor) final
Exécute une boucle 2D en concurrence.
ITask * createRootTask(ITaskFunctor *f) override
Créé une tâche racine. L'implémentation doit recopier la valeur de f qui est soit un TaskFunctor,...
void printInfos(std::ostream &o) const final
Affiche les informations sur le runtime utilisé
void _executeMDParallelFor(const ComplexForLoopRanges< RankValue > &loop_ranges, IMDRangeFunctor< RankValue > *functor, const ParallelLoopOptions &options)
Exécution d'une boucle N-dimensions.
TaskThreadInfo * currentTaskThreadInfo() const
Instance de TaskThreadInfo associé au thread courant.
void terminate() override
bool isActive() const final
Indique si l'implémentation est active.
Int32 currentTaskIndex() const final
Implémentation de TaskFactory::currentTaskIndex()
Int32 nbAllowedThread() const final
Nombre de threads utilisés au maximum pour gérer les tâches.
Contexte d'éxecution d'une tâche.
static IObservable * destroyThreadObservable()
Observable appelé lors de la destruction d'un thread pour une tâche.
static const ParallelLoopOptions & defaultParallelLoopOptions()
Valeurs par défaut d'exécution d'une boucle parallèle.
static IObservable * createThreadObservable()
Observable appelé lors de la création d'un thread pour une tâche.
static Integer verboseLevel()
Niveau de verbosité
static void setDefaultParallelLoopOptions(const ParallelLoopOptions &v)
Positionne les valeurs par défaut d'exécution d'une boucle parallèle.
Classe permettant de récupérer le temps passé entre l'appel au constructeur et au destructeur.
Allocateur mémoire avec alignement mémoire spécifique.
Exception lorsqu'une fonction n'est pas implémentée.
Integer toInteger(Real r)
Converti un Int64 en un Integer.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
SimpleForLoopRanges< 1 > makeLoopRanges(Int32 n1)
Créé un intervalle d'itération [0,n1[.
Int32 Integer
Type représentant un entier.