Arcane  v4.1.0.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
TBBThreadImplementation.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* TBBThreadImplementation.cc (C) 2000-2025 */
9/* */
10/* Implémentation des threads utilisant TBB (Intel Threads Building Blocks). */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/utils/IThreadImplementationService.h"
15#include "arcane/utils/IThreadBarrier.h"
16#include "arcane/utils/NotImplementedException.h"
17#include "arcane/utils/IFunctor.h"
18#include "arcane/utils/Mutex.h"
19#include "arcane/utils/PlatformUtils.h"
20#include "arcane/utils/internal/DependencyInjection.h"
21
22#include "arcane/FactoryService.h"
23#include "arcane/Concurrency.h"
24
25#include "arcane/parallel/thread/ArcaneThreadMisc.h"
26
27#include <tbb/tbb.h>
28#if TBB_VERSION_MAJOR >= 2020
29#define ARCANE_TBB_USE_STDTHREAD
30#include <thread>
31#else
32#include <tbb/tbb_thread.h>
33#include <tbb/atomic.h>
34#include <tbb/mutex.h>
35#endif
36
37#include <mutex>
38#include <new>
39
40/*---------------------------------------------------------------------------*/
41/*---------------------------------------------------------------------------*/
42
43namespace Arcane
44{
45
46/*---------------------------------------------------------------------------*/
47/*---------------------------------------------------------------------------*/
48
49#ifdef ARCANE_TBB_USE_STDTHREAD
50typedef std::thread::id ThreadId;
51typedef std::thread ThreadType;
52// Essaie de convertir un std::thread::id en un 'Int64'.
53// Il n'existe pas de moyen portable de le faire donc on fait quelque
54// chose de pas forcément propre. A terme il serait préférable de supprimer
55// la méthode IThreadImplementation::currentThread().
56inline Int64 arcaneGetThisThreadId()
57{
58 Int64 v = std::hash<std::thread::id>{}(std::this_thread::get_id());
59 return v;
60}
61#else
63{
64 public:
65#if defined(_WIN32) || defined(_WIN64)
66 DWORD my_id;
67#else
68 pthread_t my_id;
69#endif // _WIN32||_WIN64
70};
71
72typedef tbb::tbb_thread ThreadType;
73inline Int64 arcaneGetThisThreadId()
74{
75 ThreadType::id i = tbb::this_tbb_thread::get_id();
76 ThreadId* t = (ThreadId*)(&i);
77 Int64 v = Int64(t->my_id);
78 return v;
79}
80#endif
81
82/*---------------------------------------------------------------------------*/
83/*---------------------------------------------------------------------------*/
84
86{
87 public:
88 void lock()
89 {
90 m_mutex.lock();
91 }
92 void unlock()
93 {
94 m_mutex.unlock();
95 }
96 private:
97 std::mutex m_mutex;
98};
99
100/*---------------------------------------------------------------------------*/
101/*---------------------------------------------------------------------------*/
102
103class TBBBarrier
104: public IThreadBarrier
105{
106 public:
107 TBBBarrier()
108 : m_nb_thread(0) {}
109
110 virtual void destroy(){ delete this; }
111
112 virtual void init(Integer nb_thread)
113 {
114 m_nb_thread = nb_thread;
115 m_nb_thread_finished = 0;
116 m_timestamp = 0;
117 };
118
119 virtual bool wait()
120 {
121 Int32 ts = m_timestamp;
122 int remaining_thread = m_nb_thread - m_nb_thread_finished.fetch_add(1) - 1;
123 if (remaining_thread > 0) {
124
125 int count = 1;
126 while (m_timestamp==ts){
127 arcaneDoCPUPause(count);
128 if (count<200)
129 count *= 2;
130 else{
131 //count = 0;
132 //__TBB_Yield();
133 //TODO: peut-être rendre la main (__TBB_Yield()) si trop
134 // d'itérations.
135 }
136 }
137
138 return false;
139 }
140 m_nb_thread_finished = 0;
141 ++m_timestamp;
142 return true;
143 }
144 private:
145 Int32 m_nb_thread;
146 std::atomic<Int32> m_nb_thread_finished;
147 std::atomic<Int32> m_timestamp;
148};
149
150/*---------------------------------------------------------------------------*/
151/*---------------------------------------------------------------------------*/
152
153extern "C" IThreadBarrier*
154createGlibThreadBarrier();
155
156/*---------------------------------------------------------------------------*/
157/*---------------------------------------------------------------------------*/
161class TBBThreadImplementation
164{
165 ARCCORE_DEFINE_REFERENCE_COUNTED_INCLASS_METHODS();
166
167 void addReference() override { ReferenceCounterImpl::addReference(); }
168 void removeReference() override { ReferenceCounterImpl::removeReference(); }
169
170 public:
171
172 class StartFunc
173 {
174 public:
175 StartFunc(IFunctor* f) : m_f(f){}
176 void operator()() { m_f->executeFunctor(); }
177 IFunctor* m_f;
178 };
179
180 public:
181
182 TBBThreadImplementation()
183 : m_use_tbb_barrier(false)
184 , m_global_mutex_impl(nullptr)
185 {
186 if (!platform::getEnvironmentVariable("ARCANE_SPINLOCK_BARRIER").null())
187 m_use_tbb_barrier = true;
188 m_std_thread_implementation = Arccore::Concurrency::createStdThreadImplementation();
189 }
190
191 ~TBBThreadImplementation() override
192 {
193 //std::cout << "DESTROYING TBB IMPLEMENTATION\n";
194 GlobalMutex::destroy();
195 if (m_global_mutex_impl)
196 this->destroyMutex(m_global_mutex_impl);
197 }
198
199 public:
200
201 void build()
202 {
203 }
204
205 void initialize() override
206 {
207 m_global_mutex_impl = createMutex();
208 GlobalMutex::init(m_global_mutex_impl);
209 }
210
211 public:
212
213 ThreadImpl* createThread(IFunctor* f) override
214 {
215 return reinterpret_cast<ThreadImpl*>(new ThreadType(StartFunc(f)));
216 }
217 void joinThread(ThreadImpl* t) override
218 {
219 ThreadType* tt = reinterpret_cast<ThreadType*>(t);
220 tt->join();
221 }
222 void destroyThread(ThreadImpl* t) override
223 {
224 ThreadType* tt = reinterpret_cast<ThreadType*>(t);
225 delete tt;
226 }
227
228 void createSpinLock(Int64* spin_lock_addr) override
229 {
230 void* addr = spin_lock_addr;
231 new (addr) tbb::spin_mutex();
232 }
233 void lockSpinLock(Int64* spin_lock_addr,Int64* scoped_spin_lock_addr) override
234 {
235 tbb::spin_mutex* s = reinterpret_cast<tbb::spin_mutex*>(spin_lock_addr);
236 tbb::spin_mutex::scoped_lock* sl = new (scoped_spin_lock_addr) tbb::spin_mutex::scoped_lock();
237 sl->acquire(*s);
238 }
239 void unlockSpinLock(Int64* spin_lock_addr,Int64* scoped_spin_lock_addr) override
240 {
241 ARCANE_UNUSED(spin_lock_addr);
242 tbb::spin_mutex::scoped_lock* s = reinterpret_cast<tbb::spin_mutex::scoped_lock*>(scoped_spin_lock_addr);
243 s->release();
244 //TODO: detruire le scoped_lock.
245 }
246
247 MutexImpl* createMutex() override
248 {
249 TBBMutexImpl* m = new TBBMutexImpl();
250 return reinterpret_cast<MutexImpl*>(m);
251 }
252 void destroyMutex(MutexImpl* mutex) override
253 {
254 TBBMutexImpl* tm = reinterpret_cast<TBBMutexImpl*>(mutex);
255 delete tm;
256 }
257 void lockMutex(MutexImpl* mutex) override
258 {
259 TBBMutexImpl* tm = reinterpret_cast<TBBMutexImpl*>(mutex);
260 tm->lock();
261 }
262 void unlockMutex(MutexImpl* mutex) override
263 {
264 TBBMutexImpl* tm = reinterpret_cast<TBBMutexImpl*>(mutex);
265 tm->unlock();
266 }
267
268 Int64 currentThread() override
269 {
270 Int64 v = arcaneGetThisThreadId();
271 return v;
272 }
273
274 IThreadBarrier* createBarrier() override
275 {
276 // Il faut utiliser les TBB uniquement si demandé car il utilise
277 // l'attente active ce qui peut vite mettre la machine à genoux
278 // si le nombre de thread total est supérieur au nombre de coeurs
279 // de la machine.
280 if (m_use_tbb_barrier)
281 return new TBBBarrier();
282 return m_std_thread_implementation->createBarrier();
283 }
284
285 private:
286
287 bool m_use_tbb_barrier;
288 MutexImpl* m_global_mutex_impl;
289 Ref<IThreadImplementation> m_std_thread_implementation;
290};
291
292/*---------------------------------------------------------------------------*/
293/*---------------------------------------------------------------------------*/
294
295class TBBThreadImplementationService
297{
298 public:
299
300 explicit TBBThreadImplementationService(const ServiceBuildInfo&) {}
301 TBBThreadImplementationService() = default;
302
303 public:
304
305 void build() {}
306 public:
307 Ref<IThreadImplementation> createImplementation() override
308 {
310 }
311};
312
313/*---------------------------------------------------------------------------*/
314/*---------------------------------------------------------------------------*/
315
316// TODO: a supprimer maintenant qu'on utilise 'DependencyInjection'
318 ServiceProperty("TBBThreadImplementationService",ST_Application),
320
321ARCANE_DI_REGISTER_PROVIDER(TBBThreadImplementationService,
322 DependencyInjection::ProviderProperty("TBBThreadImplementationService"),
323 ARCANE_DI_INTERFACES(IThreadImplementationService),
324 ARCANE_DI_EMPTY_CONSTRUCTOR());
325
326/*---------------------------------------------------------------------------*/
327/*---------------------------------------------------------------------------*/
328
329} // End namespace Arcane
330
331/*---------------------------------------------------------------------------*/
332/*---------------------------------------------------------------------------*/
#define ARCANE_SERVICE_INTERFACE(ainterface)
Macro pour déclarer une interface lors de l'enregistrement d'un service.
static void init(MutexImpl *p)
Initialise le mutex global. Interne a Arccore. Doit être alloué par new.
Definition Mutex.cc:60
Interface d'un service de gestion des threads.
Interface d'un service implémentant le support des threads.
Référence à une instance.
Implémentation thread-safe d'un compteur de référence.
Structure contenant les informations pour créer un service.
Propriétés de création d'un service.
virtual void destroy()
Détruit la barrière.
virtual bool wait()
Bloque et attend que tous les threads appellent cette méthode.
virtual void init(Integer nb_thread)
Initialise la barrière pour nb_thread.
Implémentation des threads utilisant TBB (Intel Threads Building Blocks).
#define ARCANE_REGISTER_SERVICE(aclass, a_service_property,...)
Macro pour enregistrer un service.
ARCCORE_BASE_EXPORT String getEnvironmentVariable(const String &name)
Variable d'environnement du nom name.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
std::int64_t Int64
Type entier signé sur 64 bits.
void arcaneDoCPUPause(Int32 count)
Utilise l'instruction 'pause' du CPU si possible.
Int32 Integer
Type représentant un entier.
@ ST_Application
Le service s'utilise au niveau de l'application.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int32_t Int32
Type entier signé sur 32 bits.