Arcane  v3.15.0.0
Documentation utilisateur
Chargement...
Recherche...
Aucune correspondance
ConcurrencyUtils.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* ConcurrencyUtils.cc (C) 2000-2025 */
9/* */
10/* Classes gérant la concurrence (tâches, boucles parallèles, ...) */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
15
16#include "arcane/utils/internal/TaskFactoryInternal.h"
17#include "arcane/utils/TraceInfo.h"
18#include "arcane/utils/Observable.h"
19
20#include <mutex>
21
22/*---------------------------------------------------------------------------*/
23/*---------------------------------------------------------------------------*/
24
25namespace Arcane
26{
27
28/*---------------------------------------------------------------------------*/
29/*---------------------------------------------------------------------------*/
30
32: public ITask
33{
34 public:
35 typedef TaskFunctor<SerialTask> TaskType;
36 public:
37 static const int FUNCTOR_CLASS_SIZE = sizeof(TaskType);
38 public:
40 : m_functor(f)
41 {
42 // \a f doit être une instance de TaskFunctor<SerialTask>.
43 // on recopie dans un buffer pré-dimensionné pour éviter
44 // d'avoir à faire une allocation sur le tas via le new
45 // classique. On utilise donc le new avec placement.
46
47 m_functor = f->clone(functor_buf,FUNCTOR_CLASS_SIZE);
48 }
49 public:
50 void launchAndWait() override
51 {
52 if (m_functor){
53 ITaskFunctor* tmp_f = m_functor;
54 m_functor = nullptr;
55 TaskContext task_context(this);
56 tmp_f->executeFunctor(task_context);
57 delete this;
58 }
59 }
61 {
62 for( Integer i=0,n=tasks.size(); i<n; ++i )
63 tasks[i]->launchAndWait();
64 }
65 ITask* _createChildTask(ITaskFunctor* functor) override
66 {
67 return new SerialTask(functor);
68 }
69 private:
70 ITaskFunctor* m_functor;
71 char functor_buf[FUNCTOR_CLASS_SIZE];
72};
73
74/*---------------------------------------------------------------------------*/
75/*---------------------------------------------------------------------------*/
76
79{
80 public:
81 static NullTaskImplementation singleton;
82 public:
83 void initialize([[maybe_unused]] Int32 nb_thread) override
84 {
85 }
86 void terminate() override
87 {
88 }
90 {
91 return new SerialTask(f);
92 }
93 void executeParallelFor(Integer begin,Integer size,[[maybe_unused]] Integer block_size,IRangeFunctor* f) override
94 {
95 f->executeFunctor(begin,size);
96 }
97 void executeParallelFor(Integer begin,Integer size,[[maybe_unused]] const ParallelLoopOptions& options,IRangeFunctor* f) override
98 {
99 f->executeFunctor(begin,size);
100 }
101 void executeParallelFor(Integer begin,Integer size,IRangeFunctor* f) override
102 {
103 f->executeFunctor(begin,size);
104 }
105 void executeParallelFor(const ParallelFor1DLoopInfo& loop_info) override
106 {
107 loop_info.functor()->executeFunctor(loop_info.beginIndex(),loop_info.size());
108 }
110 [[maybe_unused]] const ForLoopRunInfo& run_info,
111 IMDRangeFunctor<1>* functor) override
112 {
113 functor->executeFunctor(loop_ranges);
114 }
116 [[maybe_unused]] const ForLoopRunInfo& run_info,
117 IMDRangeFunctor<2>* functor) override
118 {
119 functor->executeFunctor(loop_ranges);
120 }
122 [[maybe_unused]] const ForLoopRunInfo& run_info,
123 IMDRangeFunctor<3>* functor) override
124 {
125 functor->executeFunctor(loop_ranges);
126 }
128 [[maybe_unused]] const ForLoopRunInfo& run_info,
129 IMDRangeFunctor<4>* functor) override
130 {
131 functor->executeFunctor(loop_ranges);
132 }
133 bool isActive() const override
134 {
135 return false;
136 }
137 Int32 nbAllowedThread() const override
138 {
139 return 1;
140 }
141 Int32 currentTaskThreadIndex() const override
142 {
143 return 0;
144 }
145 Int32 currentTaskIndex() const override
146 {
147 return 0;
148 }
149
150 void printInfos(std::ostream& o) const final
151 {
152 o << "NullTaskImplementation";
153 }
154};
155
156/*---------------------------------------------------------------------------*/
157/*---------------------------------------------------------------------------*/
158
159NullTaskImplementation NullTaskImplementation::singleton;
160ITaskImplementation* TaskFactory::m_impl = &NullTaskImplementation::singleton;
161Int32 TaskFactory::m_verbose_level = 0;
162ParallelLoopOptions TaskFactory::m_default_loop_options;
163
164/*---------------------------------------------------------------------------*/
165/*---------------------------------------------------------------------------*/
166
167namespace
168{
169
170/*---------------------------------------------------------------------------*/
171/*---------------------------------------------------------------------------*/
172
173IObservable* global_created_thread_observable = 0;
174IObservable* global_destroyed_thread_observable = 0;
175std::mutex global_observable_mutex;
176
177IObservable*
178_checkCreateGlobalThreadObservable()
179{
180 if (!global_created_thread_observable)
181 global_created_thread_observable = new Observable();
182 return global_created_thread_observable;
183}
184
185/*---------------------------------------------------------------------------*/
186/*---------------------------------------------------------------------------*/
187
188}
189
190/*---------------------------------------------------------------------------*/
191/*---------------------------------------------------------------------------*/
192
193void TaskFactoryInternal::
194setImplementation(ITaskImplementation* task_impl)
195{
196 if (TaskFactory::m_impl && TaskFactory::m_impl!=&NullTaskImplementation::singleton)
197 ARCANE_FATAL("TaskFactory already has an implementation");
198 TaskFactory::m_impl = task_impl;
199}
200
201void TaskFactoryInternal::
202addThreadCreateObserver(IObserver* o)
203{
204 std::scoped_lock slock(global_observable_mutex);
205 _checkCreateGlobalThreadObservable();
206 global_created_thread_observable->attachObserver(o);
207}
208
209void TaskFactoryInternal::
210removeThreadCreateObserver(IObserver* o)
211{
212 std::scoped_lock slock(global_observable_mutex);
213 _checkCreateGlobalThreadObservable();
214 global_created_thread_observable->detachObserver(o);
215}
216
217void TaskFactoryInternal::
218notifyThreadCreated()
219{
220 std::scoped_lock slock(global_observable_mutex);
221 if (global_created_thread_observable)
222 global_created_thread_observable->notifyAllObservers();
223}
224
225/*---------------------------------------------------------------------------*/
226/*---------------------------------------------------------------------------*/
227
228/*---------------------------------------------------------------------------*/
229/*---------------------------------------------------------------------------*/
230
231void TaskFactory::
232_internalSetImplementation(ITaskImplementation* task_impl)
233{
234 TaskFactoryInternal::setImplementation(task_impl);
235}
236
237/*---------------------------------------------------------------------------*/
238/*---------------------------------------------------------------------------*/
239
242{
243 std::scoped_lock slock(global_observable_mutex);
244 return _checkCreateGlobalThreadObservable();
245}
246
247/*---------------------------------------------------------------------------*/
248/*---------------------------------------------------------------------------*/
249
252{
253 if (!global_destroyed_thread_observable)
254 global_destroyed_thread_observable = new Observable();
255 return global_destroyed_thread_observable;
256}
257
258/*---------------------------------------------------------------------------*/
259/*---------------------------------------------------------------------------*/
260
261void TaskFactory::
262terminate()
263{
264 // C'est celui qui a positionné l'implémentation qui gère sa destruction.
265 if (m_impl==&NullTaskImplementation::singleton)
266 return;
267 if (m_impl)
268 m_impl->terminate();
269 m_impl = &NullTaskImplementation::singleton;
270}
271
272/*---------------------------------------------------------------------------*/
273/*---------------------------------------------------------------------------*/
274
275} // End namespace Arcane
276
277/*---------------------------------------------------------------------------*/
278/*---------------------------------------------------------------------------*/
279
280/*!
281 * \file ConcurrencyUtils.h
282
283 \brief Classes, Types et macros pour gérer la concurrence.
284
285 Pour plus de renseignements, se reporter à la page \ref arcanedoc_parallel_concurrency
286*/
287
288/*---------------------------------------------------------------------------*/
289/*---------------------------------------------------------------------------*/
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Classes, Types et macros pour gérer la concurrence.
Informations d'exécution d'une boucle.
Interface d'un fonctor sur un interval d'itération multi-dimensionnel de dimension RankValue.
virtual void executeFunctor(const ComplexForLoopRanges< RankValue > &loop_range)=0
Exécute la méthode associée.
Interface d'un observable.
Interface d'un fonctor sur un interval d'itération.
virtual void executeFunctor(Integer begin, Integer size)=0
Exécute la méthode associée.
virtual void executeFunctor(const TaskContext &tc)=0
Exécute la méthode associé
Interface d'une tâche concourante.
void executeParallelFor(const ComplexForLoopRanges< 4 > &loop_ranges, const ForLoopRunInfo &run_info, IMDRangeFunctor< 4 > *functor) override
Exécute une boucle 4D en concurrence.
void printInfos(std::ostream &o) const final
Affiche les informations sur le runtime utilisé
Int32 currentTaskIndex() const override
Implémentation de TaskFactory::currentTaskIndex()
Int32 nbAllowedThread() const override
Nombre de threads utilisés au maximum pour gérer les tâches.
void executeParallelFor(Integer begin, Integer size, IRangeFunctor *f) override
Exécute le fonctor f en concurrence.
void executeParallelFor(const ComplexForLoopRanges< 3 > &loop_ranges, const ForLoopRunInfo &run_info, IMDRangeFunctor< 3 > *functor) override
Exécute une boucle 3D en concurrence.
void executeParallelFor(const ComplexForLoopRanges< 1 > &loop_ranges, const ForLoopRunInfo &run_info, IMDRangeFunctor< 1 > *functor) override
Exécute une boucle 1D en concurrence.
void executeParallelFor(const ComplexForLoopRanges< 2 > &loop_ranges, const ForLoopRunInfo &run_info, IMDRangeFunctor< 2 > *functor) override
Exécute une boucle 2D en concurrence.
void executeParallelFor(const ParallelFor1DLoopInfo &loop_info) override
Exécute la boucle loop_info en concurrence.
ITask * createRootTask(ITaskFunctor *f) override
Créé une tâche racine. L'implémentation doit recopier la valeur de f qui est soit un TaskFunctor,...
bool isActive() const override
Indique si l'implémentation est active.
void executeParallelFor(Integer begin, Integer size, Integer block_size, IRangeFunctor *f) override
Exécute le fonctor f en concurrence.
Int32 currentTaskThreadIndex() const override
Implémentation de TaskFactory::currentTaskThreadIndex()
void executeParallelFor(Integer begin, Integer size, const ParallelLoopOptions &options, IRangeFunctor *f) override
Exécute le fonctor f en concurrence.
Classe de base d'un observable.
Caractéristiques d'un boucle 1D multi-thread.
Options d'exécution d'une boucle parallèle en multi-thread.
void launchAndWait() override
Lance la tâche et bloque jusqu'à ce qu'elle se termine.
void launchAndWait(ConstArrayView< ITask * > tasks) override
Lance les tâches filles tasks et bloque jusqu'à ce qu'elles se terminent.
Contexte d'éxecution d'une tâche.
static IObservable * destroyThreadObservable()
Observable appelé lors de la destruction d'un thread pour une tâche.
static IObservable * createThreadObservable()
Observable appelé lors de la création d'un thread pour une tâche.
Vue constante d'un tableau de type T.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-