Arcane  v4.1.0.0
Documentation utilisateur
Chargement...
Recherche...
Aucune correspondance
ConcurrencyUtils.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* ConcurrencyUtils.cc (C) 2000-2025 */
9/* */
10/* Classes gérant la concurrence (tâches, boucles parallèles, ...) */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arccore/concurrency/ITaskImplementation.h"
15
16#include "arccore/concurrency/Task.h"
17#include "arccore/concurrency/ParallelFor.h"
18#include "arccore/concurrency/internal/TaskFactoryInternal.h"
19
20#include "arccore/base/Observable.h"
21
22#include <mutex>
23
24/*---------------------------------------------------------------------------*/
25/*---------------------------------------------------------------------------*/
26
27namespace Arcane
28{
29
30/*---------------------------------------------------------------------------*/
31/*---------------------------------------------------------------------------*/
32
33class SerialTask
34: public ITask
35{
36 public:
37
38 typedef TaskFunctor<SerialTask> TaskType;
39
40 public:
41
42 static const int FUNCTOR_CLASS_SIZE = sizeof(TaskType);
43
44 public:
45
46 SerialTask(ITaskFunctor* f)
47 : m_functor(f)
48 {
49 // \a f doit être une instance de TaskFunctor<SerialTask>.
50 // on recopie dans un buffer pré-dimensionné pour éviter
51 // d'avoir à faire une allocation sur le tas via le new
52 // classique. On utilise donc le new avec placement.
53
54 m_functor = f->clone(functor_buf, FUNCTOR_CLASS_SIZE);
55 }
56
57 public:
58
59 void launchAndWait() override
60 {
61 if (m_functor) {
62 ITaskFunctor* tmp_f = m_functor;
63 m_functor = nullptr;
64 TaskContext task_context(this);
65 tmp_f->executeFunctor(task_context);
66 delete this;
67 }
68 }
70 {
71 for (Integer i = 0, n = tasks.size(); i < n; ++i)
72 tasks[i]->launchAndWait();
73 }
74 ITask* _createChildTask(ITaskFunctor* functor) override
75 {
76 return new SerialTask(functor);
77 }
78
79 private:
80
81 ITaskFunctor* m_functor;
82 char functor_buf[FUNCTOR_CLASS_SIZE];
83};
84
85/*---------------------------------------------------------------------------*/
86/*---------------------------------------------------------------------------*/
87
90{
91 public:
92
93 static NullTaskImplementation singleton;
94
95 public:
96
97 void initialize([[maybe_unused]] Int32 nb_thread) override
98 {
99 }
100 void terminate() override
101 {
102 }
104 {
105 return new SerialTask(f);
106 }
108 [[maybe_unused]] Integer block_size, IRangeFunctor* f) override
109 {
110 f->executeFunctor(begin, size);
111 }
113 [[maybe_unused]] const ParallelLoopOptions& options,
114 IRangeFunctor* f) override
115 {
116 f->executeFunctor(begin, size);
117 }
118 void executeParallelFor(Integer begin, Integer size, IRangeFunctor* f) override
119 {
120 f->executeFunctor(begin, size);
121 }
122 void executeParallelFor(const ParallelFor1DLoopInfo& loop_info) override
123 {
124 loop_info.functor()->executeFunctor(loop_info.beginIndex(), loop_info.size());
125 }
127 [[maybe_unused]] const ForLoopRunInfo& run_info,
128 IMDRangeFunctor<1>* functor) override
129 {
130 functor->executeFunctor(loop_ranges);
131 }
133 [[maybe_unused]] const ForLoopRunInfo& run_info,
134 IMDRangeFunctor<2>* functor) override
135 {
136 functor->executeFunctor(loop_ranges);
137 }
139 [[maybe_unused]] const ForLoopRunInfo& run_info,
140 IMDRangeFunctor<3>* functor) override
141 {
142 functor->executeFunctor(loop_ranges);
143 }
145 [[maybe_unused]] const ForLoopRunInfo& run_info,
146 IMDRangeFunctor<4>* functor) override
147 {
148 functor->executeFunctor(loop_ranges);
149 }
150 bool isActive() const override
151 {
152 return false;
153 }
154 Int32 nbAllowedThread() const override
155 {
156 return 1;
157 }
159 {
160 return 0;
161 }
162 Int32 currentTaskIndex() const override
163 {
164 return 0;
165 }
166
167 void printInfos(std::ostream& o) const final
168 {
169 o << "NullTaskImplementation";
170 }
171};
172
173/*---------------------------------------------------------------------------*/
174/*---------------------------------------------------------------------------*/
175
176NullTaskImplementation NullTaskImplementation::singleton;
177ITaskImplementation* TaskFactory::m_impl = &NullTaskImplementation::singleton;
178Int32 TaskFactory::m_verbose_level = 0;
179ParallelLoopOptions TaskFactory::m_default_loop_options;
180
181/*---------------------------------------------------------------------------*/
182/*---------------------------------------------------------------------------*/
183
184namespace
185{
186 IObservable* global_created_thread_observable = 0;
187 IObservable* global_destroyed_thread_observable = 0;
188 std::mutex global_observable_mutex;
189
190 IObservable*
191 _checkCreateGlobalThreadObservable()
192 {
193 if (!global_created_thread_observable)
194 global_created_thread_observable = new Observable();
195 return global_created_thread_observable;
196 }
197} // namespace
198
199/*---------------------------------------------------------------------------*/
200/*---------------------------------------------------------------------------*/
201
202void TaskFactoryInternal::
203setImplementation(ITaskImplementation* task_impl)
204{
205 if (TaskFactory::m_impl && TaskFactory::m_impl != &NullTaskImplementation::singleton)
206 ARCCORE_FATAL("TaskFactory already has an implementation");
207 TaskFactory::m_impl = task_impl;
208}
209
210void TaskFactoryInternal::
211addThreadCreateObserver(IObserver* o)
212{
213 std::scoped_lock slock(global_observable_mutex);
214 _checkCreateGlobalThreadObservable();
215 global_created_thread_observable->attachObserver(o);
216}
217
218void TaskFactoryInternal::
219removeThreadCreateObserver(IObserver* o)
220{
221 std::scoped_lock slock(global_observable_mutex);
222 _checkCreateGlobalThreadObservable();
223 global_created_thread_observable->detachObserver(o);
224}
225
226void TaskFactoryInternal::
227notifyThreadCreated()
228{
229 std::scoped_lock slock(global_observable_mutex);
230 if (global_created_thread_observable)
231 global_created_thread_observable->notifyAllObservers();
232}
233
234/*---------------------------------------------------------------------------*/
235/*---------------------------------------------------------------------------*/
236
237/*---------------------------------------------------------------------------*/
238/*---------------------------------------------------------------------------*/
239
240void TaskFactory::
241_internalSetImplementation(ITaskImplementation* task_impl)
242{
243 TaskFactoryInternal::setImplementation(task_impl);
244}
245
246/*---------------------------------------------------------------------------*/
247/*---------------------------------------------------------------------------*/
248
251{
252 std::scoped_lock slock(global_observable_mutex);
253 return _checkCreateGlobalThreadObservable();
254}
255
256/*---------------------------------------------------------------------------*/
257/*---------------------------------------------------------------------------*/
258
261{
262 if (!global_destroyed_thread_observable)
263 global_destroyed_thread_observable = new Observable();
264 return global_destroyed_thread_observable;
265}
266
267/*---------------------------------------------------------------------------*/
268/*---------------------------------------------------------------------------*/
269
270void TaskFactory::
271terminate()
272{
273 // C'est celui qui a positionné l'implémentation qui gère sa destruction.
274 if (m_impl == &NullTaskImplementation::singleton)
275 return;
276 if (m_impl)
277 m_impl->terminate();
278 m_impl = &NullTaskImplementation::singleton;
279}
280
281/*---------------------------------------------------------------------------*/
282/*---------------------------------------------------------------------------*/
283
284} // namespace Arcane
285
286/*---------------------------------------------------------------------------*/
287/*---------------------------------------------------------------------------*/
288/*!
289 * \file ConcurrencyUtils.h
290 *
291 * \brief Classes, Types et macros pour gérer la concurrence.
292 *
293 * Pour plus de renseignements, se reporter à la page \ref arcanedoc_parallel_concurrency
294 */
295/*---------------------------------------------------------------------------*/
296/*---------------------------------------------------------------------------*/
Vue constante d'un tableau de type T.
constexpr Integer size() const noexcept
Nombre d'éléments du tableau.
Informations d'exécution d'une boucle.
Interface d'un fonctor sur un interval d'itération multi-dimensionnel de dimension RankValue.
Interface d'un fonctor sur un interval d'itération.
virtual void executeFunctor(Int32 begin, Int32 size)=0
Exécute la méthode associée.
virtual void executeFunctor(const TaskContext &tc)=0
Exécute la méthode associé
Interface d'une tâche concourante.
Definition Task.h:187
void executeParallelFor(const ComplexForLoopRanges< 4 > &loop_ranges, const ForLoopRunInfo &run_info, IMDRangeFunctor< 4 > *functor) override
Exécute une boucle 4D en concurrence.
void printInfos(std::ostream &o) const final
Affiche les informations sur le runtime utilisé
Int32 currentTaskIndex() const override
Implémentation de TaskFactory::currentTaskIndex()
Int32 nbAllowedThread() const override
Nombre de threads utilisés au maximum pour gérer les tâches.
void executeParallelFor(Integer begin, Integer size, IRangeFunctor *f) override
Exécute le fonctor f en concurrence.
void executeParallelFor(const ComplexForLoopRanges< 3 > &loop_ranges, const ForLoopRunInfo &run_info, IMDRangeFunctor< 3 > *functor) override
Exécute une boucle 3D en concurrence.
void executeParallelFor(const ComplexForLoopRanges< 1 > &loop_ranges, const ForLoopRunInfo &run_info, IMDRangeFunctor< 1 > *functor) override
Exécute une boucle 1D en concurrence.
void executeParallelFor(const ComplexForLoopRanges< 2 > &loop_ranges, const ForLoopRunInfo &run_info, IMDRangeFunctor< 2 > *functor) override
Exécute une boucle 2D en concurrence.
void executeParallelFor(const ParallelFor1DLoopInfo &loop_info) override
Exécute la boucle loop_info en concurrence.
ITask * createRootTask(ITaskFunctor *f) override
Créé une tâche racine. L'implémentation doit recopier la valeur de f qui est soit un TaskFunctor,...
bool isActive() const override
Indique si l'implémentation est active.
void executeParallelFor(Integer begin, Integer size, Integer block_size, IRangeFunctor *f) override
Exécute le fonctor f en concurrence.
Int32 currentTaskThreadIndex() const override
Implémentation de TaskFactory::currentTaskThreadIndex()
void executeParallelFor(Integer begin, Integer size, const ParallelLoopOptions &options, IRangeFunctor *f) override
Exécute le fonctor f en concurrence.
Classe de base d'un observable.
Caractéristiques d'un boucle 1D multi-thread.
Definition ParallelFor.h:34
Options d'exécution d'une boucle parallèle en multi-thread.
void launchAndWait() override
Lance la tâche et bloque jusqu'à ce qu'elle se termine.
void launchAndWait(ConstArrayView< ITask * > tasks) override
Lance les tâches filles tasks et bloque jusqu'à ce qu'elles se terminent.
Contexte d'éxecution d'une tâche.
Definition Task.h:47
static IObservable * destroyThreadObservable()
Observable appelé lors de la destruction d'un thread pour une tâche.
static IObservable * createThreadObservable()
Observable appelé lors de la création d'un thread pour une tâche.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
Int32 Integer
Type représentant un entier.
std::int32_t Int32
Type entier signé sur 32 bits.