Arcane  v4.1.0.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
ConcurrencyUtils.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* ConcurrencyUtils.cc (C) 2000-2025 */
9/* */
10/* Classes gérant la concurrence (tâches, boucles parallèles, ...) */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arccore/concurrency/ITaskImplementation.h"
15
16#include "arccore/concurrency/Task.h"
17#include "arccore/concurrency/ParallelFor.h"
18#include "arccore/concurrency/internal/TaskFactoryInternal.h"
19
20#include "arccore/base/Observable.h"
21
22#include <mutex>
23
24/*---------------------------------------------------------------------------*/
25/*---------------------------------------------------------------------------*/
26
27namespace Arcane
28{
29
30/*---------------------------------------------------------------------------*/
31/*---------------------------------------------------------------------------*/
32
33class SerialTask
34: public ITask
35{
36 public:
37
38 typedef TaskFunctor<SerialTask> TaskType;
39
40 public:
41
42 static const int FUNCTOR_CLASS_SIZE = sizeof(TaskType);
43
44 public:
45
46 SerialTask(ITaskFunctor* f)
47 : m_functor(f)
48 {
49 // \a f doit être une instance de TaskFunctor<SerialTask>.
50 // on recopie dans un buffer pré-dimensionné pour éviter
51 // d'avoir à faire une allocation sur le tas via le new
52 // classique. On utilise donc le new avec placement.
53
54 m_functor = f->clone(functor_buf, FUNCTOR_CLASS_SIZE);
55 }
56
57 public:
58
59 void launchAndWait() override
60 {
61 if (m_functor) {
62 ITaskFunctor* tmp_f = m_functor;
63 m_functor = nullptr;
64 TaskContext task_context(this);
65 tmp_f->executeFunctor(task_context);
66 delete this;
67 }
68 }
70 {
71 for (Integer i = 0, n = tasks.size(); i < n; ++i)
72 tasks[i]->launchAndWait();
73 }
74 ITask* _createChildTask(ITaskFunctor* functor) override
75 {
76 return new SerialTask(functor);
77 }
78
79 private:
80
81 ITaskFunctor* m_functor;
82 char functor_buf[FUNCTOR_CLASS_SIZE];
83};
84
85/*---------------------------------------------------------------------------*/
86/*---------------------------------------------------------------------------*/
87
90{
91 public:
92
93 static NullTaskImplementation singleton;
94
95 public:
96
97 void initialize([[maybe_unused]] Int32 nb_thread) override
98 {
99 }
100 void terminate() override
101 {
102 }
104 {
105 return new SerialTask(f);
106 }
108 [[maybe_unused]] Integer block_size, IRangeFunctor* f) override
109 {
110 f->executeFunctor(begin, size);
111 }
113 [[maybe_unused]] const ParallelLoopOptions& options,
114 IRangeFunctor* f) override
115 {
116 f->executeFunctor(begin, size);
117 }
118 void executeParallelFor(Integer begin, Integer size, IRangeFunctor* f) override
119 {
120 f->executeFunctor(begin, size);
121 }
122 void executeParallelFor(const ParallelFor1DLoopInfo& loop_info) override
123 {
124 loop_info.functor()->executeFunctor(loop_info.beginIndex(), loop_info.size());
125 }
127 [[maybe_unused]] const ForLoopRunInfo& run_info,
128 IMDRangeFunctor<1>* functor) override
129 {
130 functor->executeFunctor(loop_ranges);
131 }
133 [[maybe_unused]] const ForLoopRunInfo& run_info,
134 IMDRangeFunctor<2>* functor) override
135 {
136 functor->executeFunctor(loop_ranges);
137 }
139 [[maybe_unused]] const ForLoopRunInfo& run_info,
140 IMDRangeFunctor<3>* functor) override
141 {
142 functor->executeFunctor(loop_ranges);
143 }
145 [[maybe_unused]] const ForLoopRunInfo& run_info,
146 IMDRangeFunctor<4>* functor) override
147 {
148 functor->executeFunctor(loop_ranges);
149 }
150 bool isActive() const override
151 {
152 return false;
153 }
155 {
156 return 0;
157 }
158 Int32 currentTaskIndex() const override
159 {
160 return 0;
161 }
162
163 void printInfos(std::ostream& o) const final
164 {
165 o << "NullTaskImplementation";
166 }
167};
168
169/*---------------------------------------------------------------------------*/
170/*---------------------------------------------------------------------------*/
171
172NullTaskImplementation NullTaskImplementation::singleton;
173ITaskImplementation* TaskFactory::m_impl = &NullTaskImplementation::singleton;
174Int32 TaskFactory::m_verbose_level = 0;
175
176/*---------------------------------------------------------------------------*/
177/*---------------------------------------------------------------------------*/
178
179namespace
180{
181 IObservable* global_created_thread_observable = 0;
182 IObservable* global_destroyed_thread_observable = 0;
183 std::mutex global_observable_mutex;
184
185 IObservable*
186 _checkCreateGlobalThreadObservable()
187 {
188 if (!global_created_thread_observable)
189 global_created_thread_observable = new Observable();
190 return global_created_thread_observable;
191 }
192} // namespace
193
194/*---------------------------------------------------------------------------*/
195/*---------------------------------------------------------------------------*/
196
197void TaskFactoryInternal::
198setImplementation(ITaskImplementation* task_impl)
199{
200 if (TaskFactory::m_impl && TaskFactory::m_impl != &NullTaskImplementation::singleton)
201 ARCCORE_FATAL("TaskFactory already has an implementation");
202 TaskFactory::m_impl = task_impl;
203}
204
207{
208 std::scoped_lock slock(global_observable_mutex);
209 _checkCreateGlobalThreadObservable();
210 global_created_thread_observable->attachObserver(o);
211}
212
215{
216 std::scoped_lock slock(global_observable_mutex);
217 _checkCreateGlobalThreadObservable();
218 global_created_thread_observable->detachObserver(o);
219}
220
223{
224 std::scoped_lock slock(global_observable_mutex);
225 if (global_created_thread_observable)
226 global_created_thread_observable->notifyAllObservers();
227}
228
229/*---------------------------------------------------------------------------*/
230/*---------------------------------------------------------------------------*/
231
232/*---------------------------------------------------------------------------*/
233/*---------------------------------------------------------------------------*/
234
237{
238 TaskFactoryInternal::setImplementation(task_impl);
239}
240
241/*---------------------------------------------------------------------------*/
242/*---------------------------------------------------------------------------*/
243
246{
247 std::scoped_lock slock(global_observable_mutex);
248 return _checkCreateGlobalThreadObservable();
249}
250
251/*---------------------------------------------------------------------------*/
252/*---------------------------------------------------------------------------*/
253
256{
257 if (!global_destroyed_thread_observable)
258 global_destroyed_thread_observable = new Observable();
259 return global_destroyed_thread_observable;
260}
261
262/*---------------------------------------------------------------------------*/
263/*---------------------------------------------------------------------------*/
264
266terminate()
267{
268 // C'est celui qui a positionné l'implémentation qui gère sa destruction.
269 if (m_impl == &NullTaskImplementation::singleton)
270 return;
271 if (m_impl)
272 m_impl->terminate();
273 m_impl = &NullTaskImplementation::singleton;
274}
275
276/*---------------------------------------------------------------------------*/
277/*---------------------------------------------------------------------------*/
278
279} // namespace Arcane
280
281/*---------------------------------------------------------------------------*/
282/*---------------------------------------------------------------------------*/
290/*---------------------------------------------------------------------------*/
291/*---------------------------------------------------------------------------*/
Vue constante d'un tableau de type T.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
Informations d'exécution d'une boucle.
Interface d'un fonctor sur un interval d'itération multi-dimensionnel de dimension RankValue.
Interface d'un fonctor sur un interval d'itération.
virtual void executeFunctor(Int32 begin, Int32 size)=0
Exécute la méthode associée.
Interface d'un fonctor pour une tâche.
Definition Task.h:71
virtual void executeFunctor(const TaskContext &tc)=0
Exécute la méthode associé
Implémentation d'une fabrique de tâches.
Interface d'une tâche concourante.
Definition Task.h:186
void executeParallelFor(const ComplexForLoopRanges< 4 > &loop_ranges, const ForLoopRunInfo &run_info, IMDRangeFunctor< 4 > *functor) override
Exécute une boucle 4D en concurrence.
void printInfos(std::ostream &o) const final
Affiche les informations sur le runtime utilisé
Int32 currentTaskIndex() const override
Implémentation de TaskFactory::currentTaskIndex()
void executeParallelFor(Integer begin, Integer size, IRangeFunctor *f) override
Exécute le fonctor f en concurrence.
void executeParallelFor(const ComplexForLoopRanges< 3 > &loop_ranges, const ForLoopRunInfo &run_info, IMDRangeFunctor< 3 > *functor) override
Exécute une boucle 3D en concurrence.
void executeParallelFor(const ComplexForLoopRanges< 1 > &loop_ranges, const ForLoopRunInfo &run_info, IMDRangeFunctor< 1 > *functor) override
Exécute une boucle 1D en concurrence.
void executeParallelFor(const ComplexForLoopRanges< 2 > &loop_ranges, const ForLoopRunInfo &run_info, IMDRangeFunctor< 2 > *functor) override
Exécute une boucle 2D en concurrence.
void executeParallelFor(const ParallelFor1DLoopInfo &loop_info) override
Exécute la boucle loop_info en concurrence.
ITask * createRootTask(ITaskFunctor *f) override
Créé une tâche racine. L'implémentation doit recopier la valeur de f qui est soit un TaskFunctor,...
bool isActive() const override
Indique si l'implémentation est active.
void executeParallelFor(Integer begin, Integer size, Integer block_size, IRangeFunctor *f) override
Exécute le fonctor f en concurrence.
Int32 currentTaskThreadIndex() const override
Implémentation de TaskFactory::currentTaskThreadIndex()
void executeParallelFor(Integer begin, Integer size, const ParallelLoopOptions &options, IRangeFunctor *f) override
Exécute le fonctor f en concurrence.
void initialize(Int32 nb_thread) override
Classe de base d'un observable.
Caractéristiques d'un boucle 1D multi-thread.
Definition ParallelFor.h:34
Options d'exécution d'une boucle parallèle en multi-thread.
void launchAndWait() override
Lance la tâche et bloque jusqu'à ce qu'elle se termine.
void launchAndWait(ConstArrayView< ITask * > tasks) override
Lance les tâches filles tasks et bloque jusqu'à ce qu'elles se terminent.
Contexte d'éxecution d'une tâche.
Definition Task.h:46
static void removeThreadCreateObserver(IObserver *o)
Supprime un observateur pour la création de thread.
static void notifyThreadCreated()
Notifie tous les observateurs de création de thread.
static void addThreadCreateObserver(IObserver *o)
Ajoute un observateur pour la création de thread.
static IObservable * destroyThreadObservable()
Observable appelé lors de la destruction d'un thread pour une tâche.
static void terminate()
Indique qu'on n'utilisera plus les threads. Cette méthode ne doit pas être appelée lorsque des tâches...
static IObservable * createThreadObservable()
Observable appelé lors de la création d'un thread pour une tâche.
static void _internalSetImplementation(ITaskImplementation *task_impl)
Fonctor sans argument pour une tâche.
Definition Task.h:98
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
Int32 Integer
Type représentant un entier.
std::int32_t Int32
Type entier signé sur 32 bits.