Arcane  v3.14.10.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
TBBThreadImplementation.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2024 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* TBBThreadImplementation.cc (C) 2000-2024 */
9/* */
10/* Implémentation des threads utilisant TBB (Intel Threads Building Blocks). */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/utils/IThreadImplementationService.h"
15#include "arcane/utils/IThreadBarrier.h"
16#include "arcane/utils/NotImplementedException.h"
17#include "arcane/utils/IFunctor.h"
18#include "arcane/utils/Mutex.h"
19#include "arcane/utils/PlatformUtils.h"
20
21#include "arcane/FactoryService.h"
22#include "arcane/Concurrency.h"
23
24#include "arcane/parallel/thread/ArcaneThreadMisc.h"
25
26#include <tbb/tbb.h>
27#if TBB_VERSION_MAJOR >= 2020
28#define ARCANE_TBB_USE_STDTHREAD
29#include <thread>
30#else
31#include <tbb/tbb_thread.h>
32#include <tbb/atomic.h>
33#include <tbb/mutex.h>
34#endif
35
36#include <mutex>
37#include <new>
38
39/*---------------------------------------------------------------------------*/
40/*---------------------------------------------------------------------------*/
41
42namespace Arcane
43{
44
45/*---------------------------------------------------------------------------*/
46/*---------------------------------------------------------------------------*/
47
48#ifdef ARCANE_TBB_USE_STDTHREAD
49typedef std::thread::id ThreadId;
50typedef std::thread ThreadType;
51// Essaie de convertir un std::thread::id en un 'Int64'.
52// Il n'existe pas de moyen portable de le faire donc on fait quelque
53// chose de pas forcément propre. A terme il serait préférable de supprimer
54// la méthode IThreadImplementation::currentThread().
55inline Int64 arcaneGetThisThreadId()
56{
57 Int64 v = std::hash<std::thread::id>{}(std::this_thread::get_id());
58 return v;
59}
60#else
62{
63 public:
64#if defined(_WIN32) || defined(_WIN64)
65 DWORD my_id;
66#else
67 pthread_t my_id;
68#endif // _WIN32||_WIN64
69};
70
71typedef tbb::tbb_thread ThreadType;
72inline Int64 arcaneGetThisThreadId()
73{
74 ThreadType::id i = tbb::this_tbb_thread::get_id();
75 ThreadId* t = (ThreadId*)(&i);
76 Int64 v = Int64(t->my_id);
77 return v;
78}
79#endif
80
81/*---------------------------------------------------------------------------*/
82/*---------------------------------------------------------------------------*/
83
85{
86 public:
87 void lock()
88 {
89 m_mutex.lock();
90 }
91 void unlock()
92 {
93 m_mutex.unlock();
94 }
95 private:
96 std::mutex m_mutex;
97};
98
99/*---------------------------------------------------------------------------*/
100/*---------------------------------------------------------------------------*/
101
103: public IThreadBarrier
104{
105 public:
106 TBBBarrier()
107 : m_nb_thread(0) {}
108
109 virtual void destroy(){ delete this; }
110
111 virtual void init(Integer nb_thread)
112 {
113 m_nb_thread = nb_thread;
114 m_nb_thread_finished = 0;
115 m_timestamp = 0;
116 };
117
118 virtual bool wait()
119 {
120 Int32 ts = m_timestamp;
121 int remaining_thread = m_nb_thread - m_nb_thread_finished.fetch_add(1) - 1;
122 if (remaining_thread > 0) {
123
124 int count = 1;
125 while (m_timestamp==ts){
126 arcaneDoCPUPause(count);
127 if (count<200)
128 count *= 2;
129 else{
130 //count = 0;
131 //__TBB_Yield();
132 //TODO: peut-être rendre la main (__TBB_Yield()) si trop
133 // d'itérations.
134 }
135 }
136
137 return false;
138 }
139 m_nb_thread_finished = 0;
140 ++m_timestamp;
141 return true;
142 }
143 private:
144 Int32 m_nb_thread;
145 std::atomic<Int32> m_nb_thread_finished;
146 std::atomic<Int32> m_timestamp;
147};
148
149/*---------------------------------------------------------------------------*/
150/*---------------------------------------------------------------------------*/
151
152extern "C" IThreadBarrier*
153createGlibThreadBarrier();
154
155/*---------------------------------------------------------------------------*/
156/*---------------------------------------------------------------------------*/
163{
164 ARCCORE_DEFINE_REFERENCE_COUNTED_INCLASS_METHODS();
165
166 void addReference() override { ReferenceCounterImpl::addReference(); }
167 void removeReference() override { ReferenceCounterImpl::removeReference(); }
168
169 public:
170
172 {
173 public:
174 StartFunc(IFunctor* f) : m_f(f){}
175 void operator()() { m_f->executeFunctor(); }
176 IFunctor* m_f;
177 };
178
179 public:
180
182 : m_use_tbb_barrier(false)
183 , m_global_mutex_impl(nullptr)
184 {
185 if (!platform::getEnvironmentVariable("ARCANE_SPINLOCK_BARRIER").null())
186 m_use_tbb_barrier = true;
187 m_std_thread_implementation = Arccore::Concurrency::createStdThreadImplementation();
188 }
189
190 ~TBBThreadImplementation() override
191 {
192 //std::cout << "DESTROYING TBB IMPLEMENTATION\n";
193 GlobalMutex::destroy();
194 if (m_global_mutex_impl)
195 this->destroyMutex(m_global_mutex_impl);
196 }
197
198 public:
199
200 void build()
201 {
202 }
203
204 void initialize() override
205 {
206 m_global_mutex_impl = createMutex();
207 GlobalMutex::init(m_global_mutex_impl);
208 }
209
210 public:
211
212 ThreadImpl* createThread(IFunctor* f) override
213 {
214 return reinterpret_cast<ThreadImpl*>(new ThreadType(StartFunc(f)));
215 }
216 void joinThread(ThreadImpl* t) override
217 {
218 ThreadType* tt = reinterpret_cast<ThreadType*>(t);
219 tt->join();
220 }
221 void destroyThread(ThreadImpl* t) override
222 {
223 ThreadType* tt = reinterpret_cast<ThreadType*>(t);
224 delete tt;
225 }
226
227 void createSpinLock(Int64* spin_lock_addr) override
228 {
229 void* addr = spin_lock_addr;
230 new (addr) tbb::spin_mutex();
231 }
232 void lockSpinLock(Int64* spin_lock_addr,Int64* scoped_spin_lock_addr) override
233 {
234 tbb::spin_mutex* s = reinterpret_cast<tbb::spin_mutex*>(spin_lock_addr);
235 tbb::spin_mutex::scoped_lock* sl = new (scoped_spin_lock_addr) tbb::spin_mutex::scoped_lock();
236 sl->acquire(*s);
237 }
238 void unlockSpinLock(Int64* spin_lock_addr,Int64* scoped_spin_lock_addr) override
239 {
240 ARCANE_UNUSED(spin_lock_addr);
241 tbb::spin_mutex::scoped_lock* s = reinterpret_cast<tbb::spin_mutex::scoped_lock*>(scoped_spin_lock_addr);
242 s->release();
243 //TODO: detruire le scoped_lock.
244 }
245
246 MutexImpl* createMutex() override
247 {
248 TBBMutexImpl* m = new TBBMutexImpl();
249 return reinterpret_cast<MutexImpl*>(m);
250 }
251 void destroyMutex(MutexImpl* mutex) override
252 {
253 TBBMutexImpl* tm = reinterpret_cast<TBBMutexImpl*>(mutex);
254 delete tm;
255 }
256 void lockMutex(MutexImpl* mutex) override
257 {
258 TBBMutexImpl* tm = reinterpret_cast<TBBMutexImpl*>(mutex);
259 tm->lock();
260 }
261 void unlockMutex(MutexImpl* mutex) override
262 {
263 TBBMutexImpl* tm = reinterpret_cast<TBBMutexImpl*>(mutex);
264 tm->unlock();
265 }
266
267 Int64 currentThread() override
268 {
269 Int64 v = arcaneGetThisThreadId();
270 return v;
271 }
272
273 IThreadBarrier* createBarrier() override
274 {
275 // Il faut utiliser les TBB uniquement si demandé car il utilise
276 // l'attente active ce qui peut vite mettre la machine à genoux
277 // si le nombre de thread total est supérieur au nombre de coeurs
278 // de la machine.
279 if (m_use_tbb_barrier)
280 return new TBBBarrier();
281 return m_std_thread_implementation->createBarrier();
282 }
283
284 private:
285
286 bool m_use_tbb_barrier;
287 MutexImpl* m_global_mutex_impl;
288 Ref<IThreadImplementation> m_std_thread_implementation;
289};
290
291/*---------------------------------------------------------------------------*/
292/*---------------------------------------------------------------------------*/
293
296{
297 public:
298
300
301 public:
302
303 void build() {}
304 public:
305 Ref<IThreadImplementation> createImplementation() override
306 {
308 }
309};
310
311/*---------------------------------------------------------------------------*/
312/*---------------------------------------------------------------------------*/
313
315 ServiceProperty("TBBThreadImplementationService",ST_Application),
317
318/*---------------------------------------------------------------------------*/
319/*---------------------------------------------------------------------------*/
320
321} // End namespace Arcane
322
323/*---------------------------------------------------------------------------*/
324/*---------------------------------------------------------------------------*/
#define ARCANE_SERVICE_INTERFACE(ainterface)
Macro pour déclarer une interface lors de l'enregistrement d'un service.
Interface d'un service de gestion des threads.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Definition Lima.cc:120
Structure contenant les informations pour créer un service.
Propriétés de création d'un service.
virtual void destroy()
Détruit la barrière.
virtual bool wait()
Bloque et attend que tous les threads appellent cette méthode.
Implémentation des threads utilisant TBB (Intel Threads Building Blocks).
static void init(MutexImpl *p)
Initialise le mutex global. Interne a Arccore. Doit être alloué par new.
Definition Mutex.cc:63
virtual void executeFunctor()=0
Exécute la méthode associé
Interface d'un service implémentant le support des threads.
Implémentation thread-safe d'un compteur de référence.
#define ARCANE_REGISTER_SERVICE(aclass, a_service_property,...)
Macro pour enregistrer un service.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
void arcaneDoCPUPause(Int32 count)
Utilise l'instruction 'pause' du CPU si possible.
@ ST_Application
Le service s'utilise au niveau de l'application.