Arcane  v3.15.0.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
RunQueueImpl.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2024 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* RunQueueImpl.cc (C) 2000-2024 */
9/* */
10/* Gestion d'une file d'exécution sur accélérateur. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/accelerator/core/internal/RunQueueImpl.h"
15
16#include "arcane/utils/FatalErrorException.h"
18#include "arcane/utils/SmallArray.h"
19
20#include "arcane/accelerator/core/internal/IRunnerRuntime.h"
21#include "arcane/accelerator/core/internal/IRunQueueStream.h"
22#include "arcane/accelerator/core/internal/RunCommandImpl.h"
23#include "arcane/accelerator/core/internal/RunnerImpl.h"
24#include "arcane/accelerator/core/internal/IRunQueueEventImpl.h"
25#include "arcane/accelerator/core/Runner.h"
26#include "arcane/accelerator/core/RunQueueBuildInfo.h"
27#include "arcane/accelerator/core/DeviceId.h"
28#include "arcane/accelerator/core/RunQueueEvent.h"
29
30#include <unordered_set>
31
32/*---------------------------------------------------------------------------*/
33/*---------------------------------------------------------------------------*/
34
35namespace Arcane::Accelerator::impl
36{
37
38/*---------------------------------------------------------------------------*/
39/*---------------------------------------------------------------------------*/
40
43{
44 public:
45
46 explicit Lock(RunQueueImpl* p)
47 {
48 if (p->m_use_pool_mutex) {
49 m_mutex = p->m_pool_mutex.get();
50 if (m_mutex) {
51 m_mutex->lock();
52 }
53 }
54 }
55 ~Lock()
56 {
57 if (m_mutex)
58 m_mutex->unlock();
59 }
60 Lock(const Lock&) = delete;
61 Lock& operator=(const Lock&) = delete;
62
63 private:
64
65 std::mutex* m_mutex = nullptr;
66};
67
68/*---------------------------------------------------------------------------*/
69/*---------------------------------------------------------------------------*/
70
71RunQueueImpl::
72RunQueueImpl(RunnerImpl* runner_impl, Int32 id, const RunQueueBuildInfo& bi)
73: m_runner_impl(runner_impl)
74, m_execution_policy(runner_impl->executionPolicy())
75, m_runtime(runner_impl->runtime())
76, m_queue_stream(m_runtime->createStream(bi))
77, m_id(id)
78{
79}
80
81/*---------------------------------------------------------------------------*/
82/*---------------------------------------------------------------------------*/
83
84RunQueueImpl::
85~RunQueueImpl()
86{
87 delete m_queue_stream;
88}
89
90/*---------------------------------------------------------------------------*/
91/*---------------------------------------------------------------------------*/
92
93/*---------------------------------------------------------------------------*/
94/*---------------------------------------------------------------------------*/
95
96void RunQueueImpl::
97_freeCommandsInPool()
98{
99 bool is_check = arcaneIsCheck();
100 std::unordered_set<RunCommandImpl*> command_set;
101 while (!m_run_command_pool.empty()) {
102 RunCommandImpl* c = m_run_command_pool.top();
103 if (is_check) {
104 if (command_set.find(c) != command_set.end())
105 std::cerr << "Command is present several times in the command pool\n";
106 command_set.insert(c);
107 }
108 RunCommand::_internalDestroyImpl(c);
109 m_run_command_pool.pop();
110 }
111}
112
113/*---------------------------------------------------------------------------*/
114/*---------------------------------------------------------------------------*/
115
116void RunQueueImpl::
117_destroy(RunQueueImpl* q)
118{
119 q->_freeCommandsInPool();
120 delete q;
121}
122
123/*---------------------------------------------------------------------------*/
124/*---------------------------------------------------------------------------*/
125
126/*---------------------------------------------------------------------------*/
127/*---------------------------------------------------------------------------*/
128
129void RunQueueImpl::
130_release()
131{
132 // S'il reste des commandes en cours d'exécution au moment de libérer
133 // la file d'exécution il faut attendre pour éviter des fuites mémoire car
134 // les commandes ne seront pas désallouées.
135 // TODO: Regarder s'il ne faudrait pas plutôt indiquer cela à l'utilisateur
136 // ou faire une erreur fatale.
137 if (!m_active_run_command_list.empty()) {
138 if (!_internalStream()->_barrierNoException()) {
140 }
141 else
142 std::cerr << "WARNING: Error in internal accelerator barrier\n";
143 }
144 if (_isInPool())
145 m_runner_impl->_internalPutRunQueueImplInPool(this);
146 else {
147 delete this;
148 }
149}
150
151/*---------------------------------------------------------------------------*/
152/*---------------------------------------------------------------------------*/
153
154void RunQueueImpl::
155_setDefaultMemoryRessource()
156{
157 m_memory_ressource = eMemoryRessource::Host;
158 if (isAcceleratorPolicy(m_execution_policy))
159 m_memory_ressource = eMemoryRessource::UnifiedMemory;
160}
161
162/*---------------------------------------------------------------------------*/
163/*---------------------------------------------------------------------------*/
164
165MemoryAllocationOptions RunQueueImpl::
166allocationOptions() const
167{
168 MemoryAllocationOptions opt = MemoryUtils::getAllocationOptions(m_memory_ressource);
169 Int16 device_id = static_cast<Int16>(m_runner_impl->deviceId().asInt32());
170 opt.setDevice(device_id);
171 return opt;
172}
173
174/*---------------------------------------------------------------------------*/
175/*---------------------------------------------------------------------------*/
176
177bool RunQueueImpl::
178isAutoPrefetchCommand() const
179{
180 return m_runner_impl->isAutoPrefetchCommand();
181}
182
183/*---------------------------------------------------------------------------*/
184/*---------------------------------------------------------------------------*/
185
186void RunQueueImpl::
187copyMemory(const MemoryCopyArgs& args) const
188{
189 _internalStream()->copyMemory(args);
190}
191
192/*---------------------------------------------------------------------------*/
193/*---------------------------------------------------------------------------*/
194
195void RunQueueImpl::
196prefetchMemory(const MemoryPrefetchArgs& args) const
197{
198 _internalStream()->prefetchMemory(args);
199}
200
201/*---------------------------------------------------------------------------*/
202/*---------------------------------------------------------------------------*/
203
204void RunQueueImpl::
205recordEvent(RunQueueEvent& event)
206{
207 auto* p = event._internalEventImpl();
208 return p->recordQueue(_internalStream());
209}
210
211/*---------------------------------------------------------------------------*/
212/*---------------------------------------------------------------------------*/
213
214void RunQueueImpl::
215waitEvent(RunQueueEvent& event)
216{
217 auto* p = event._internalEventImpl();
218 return p->waitForEvent(_internalStream());
219}
220
221/*---------------------------------------------------------------------------*/
222/*---------------------------------------------------------------------------*/
223
224RunQueueImpl* RunQueueImpl::
225create(RunnerImpl* r)
226{
227 return _reset(r->_internalCreateOrGetRunQueueImpl());
228}
229
230/*---------------------------------------------------------------------------*/
231/*---------------------------------------------------------------------------*/
232
233RunQueueImpl* RunQueueImpl::
234create(RunnerImpl* r, const RunQueueBuildInfo& bi)
235{
236 return _reset(r->_internalCreateOrGetRunQueueImpl(bi));
237}
238
239/*---------------------------------------------------------------------------*/
240/*---------------------------------------------------------------------------*/
241
242RunCommandImpl* RunQueueImpl::
243_internalCreateOrGetRunCommandImpl()
244{
245 RunCommandImpl* p = nullptr;
246
247 {
248 auto& pool = m_run_command_pool;
249 Lock my_lock(this);
250 if (!pool.empty()) {
251 p = pool.top();
252 pool.pop();
253 }
254 }
255 if (!p)
256 p = RunCommand::_internalCreateImpl(this);
257 p->_reset();
258 return p;
259}
260
261/*---------------------------------------------------------------------------*/
262/*---------------------------------------------------------------------------*/
271{
272 if (m_use_pool_mutex) {
274 // Recopie les commandes dans un tableau local car m_active_run_command_list
275 // peut être modifié par un autre thread.
276 {
277 Lock my_lock(this);
279 command_list.add(p);
280 }
282 }
283 for (RunCommandImpl* p : command_list) {
284 p->notifyEndExecuteKernel();
285 }
286 {
287 Lock my_lock(this);
288 for (RunCommandImpl* p : command_list) {
290 }
291 }
292 }
293 else {
295 p->notifyEndExecuteKernel();
297 }
299 }
300}
301
302/*---------------------------------------------------------------------------*/
303/*---------------------------------------------------------------------------*/
316{
317 if (p->m_has_living_run_command)
318 p->m_may_be_put_in_pool = true;
319 else
320 m_run_command_pool.push(p);
321}
322
323/*---------------------------------------------------------------------------*/
324/*---------------------------------------------------------------------------*/
325
326void RunQueueImpl::
327_addRunningCommand(RunCommandImpl* p)
328{
329 Lock my_lock(this);
331}
332
333/*---------------------------------------------------------------------------*/
334/*---------------------------------------------------------------------------*/
335
336void RunQueueImpl::
337_putInCommandPool(RunCommandImpl* p)
338{
339 Lock my_lock(this);
340 m_run_command_pool.push(p);
341}
342
343/*---------------------------------------------------------------------------*/
344/*---------------------------------------------------------------------------*/
350{
351 _internalStream()->barrier();
353}
354
355/*---------------------------------------------------------------------------*/
356/*---------------------------------------------------------------------------*/
366{
367 p->m_is_async = false;
368 p->_setDefaultMemoryRessource();
369 return p;
370}
371
372/*---------------------------------------------------------------------------*/
373/*---------------------------------------------------------------------------*/
374
375void RunQueueImpl::
376setConcurrentCommandCreation(bool v)
377{
378 m_use_pool_mutex = v;
379 if (!m_pool_mutex.get())
380 m_pool_mutex = std::make_unique<std::mutex>();
381}
382
383/*---------------------------------------------------------------------------*/
384/*---------------------------------------------------------------------------*/
385
386void RunQueueImpl::
387dumpStats(std::ostream& ostr) const
388{
389 ostr << "nb_pool=" << m_run_command_pool.size()
390 << " nb_active=" << m_active_run_command_list.size() << "\n";
391}
392
393/*---------------------------------------------------------------------------*/
394/*---------------------------------------------------------------------------*/
395
396} // namespace Arcane::Accelerator::impl
397
398/*---------------------------------------------------------------------------*/
399/*---------------------------------------------------------------------------*/
Fonctions de gestion mémoire et des allocateurs.
Int32 asInt32() const
Valeur numérique du device.
Definition DeviceId.h:69
Informations pour créer une RunQueue.
virtual void barrier()=0
Bloque jusqu'à ce que toutes les actions associées à cette file soient terminées.
virtual void prefetchMemory(const MemoryPrefetchArgs &args)=0
Effectue un pré-chargement d'une zone mémoire.
virtual void copyMemory(const MemoryCopyArgs &args)=0
Effectue une copie entre deux zones mémoire.
Implémentation d'une commande pour accélérateur.
Verrou pour le pool de RunCommand en multi-thread.
File d'exécution pour accélérateur.
void _checkPutCommandInPoolNoLock(RunCommandImpl *p)
Remet la commande dans le pool si possible.
static RunQueueImpl * _reset(RunQueueImpl *p)
Réinitialise l'implémentation.
eMemoryRessource m_memory_ressource
Ressource mémoire par défaut.
void _internalBarrier()
Bloque jusqu'à ce que toutes les commandes soient terminées.
std::stack< RunCommandImpl * > m_run_command_pool
Pool de commandes.
UniqueArray< RunCommandImpl * > m_active_run_command_list
Liste des commandes en cours d'exécution.
void _internalFreeRunningCommands()
Libère les commandes en cours d'exécution.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Definition Lima.cc:149
bool isAcceleratorPolicy(eExecutionPolicy exec_policy)
Indique si exec_policy correspond à un accélérateur.
MemoryAllocationOptions getAllocationOptions(eMemoryResource mem_resource)
Allocation par défaut pour la ressource mem_resource.
bool arcaneIsCheck()
Vrai si on est en mode vérification.
Definition Misc.cc:68
std::int32_t Int32
Type entier signé sur 32 bits.