Arcane  v3.16.8.0
Documentation utilisateur
Chargement...
Recherche...
Aucune correspondance
RunQueueImpl.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* RunQueueImpl.cc (C) 2000-2025 */
9/* */
10/* Gestion d'une file d'exécution sur accélérateur. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/accelerator/core/internal/RunQueueImpl.h"
15
17#include "arcane/utils/SmallArray.h"
18
19#include "arcane/accelerator/core/internal/IRunnerRuntime.h"
20#include "arcane/accelerator/core/internal/IRunQueueStream.h"
21#include "arcane/accelerator/core/internal/RunCommandImpl.h"
22#include "arcane/accelerator/core/internal/RunnerImpl.h"
23#include "arcane/accelerator/core/internal/IRunQueueEventImpl.h"
24#include "arcane/accelerator/core/Runner.h"
25#include "arcane/accelerator/core/DeviceId.h"
26#include "arcane/accelerator/core/RunQueueEvent.h"
27#include "arcane/accelerator/core/KernelLaunchArgs.h"
28
29#include <unordered_set>
30
31/*---------------------------------------------------------------------------*/
32/*---------------------------------------------------------------------------*/
33
34namespace Arcane::Accelerator::impl
35{
36
37/*---------------------------------------------------------------------------*/
38/*---------------------------------------------------------------------------*/
39
40//! Verrou pour le pool de RunCommand en multi-thread.
42{
43 public:
44
45 explicit Lock(RunQueueImpl* p)
46 {
47 if (p->m_use_pool_mutex) {
48 m_mutex = p->m_pool_mutex.get();
49 if (m_mutex) {
50 m_mutex->lock();
51 }
52 }
53 }
54 ~Lock()
55 {
56 if (m_mutex)
57 m_mutex->unlock();
58 }
59 Lock(const Lock&) = delete;
60 Lock& operator=(const Lock&) = delete;
61
62 private:
63
64 std::mutex* m_mutex = nullptr;
65};
66
67/*---------------------------------------------------------------------------*/
68/*---------------------------------------------------------------------------*/
69
70RunQueueImpl::
71RunQueueImpl(RunnerImpl* runner_impl, Int32 id, const RunQueueBuildInfo& bi)
72: m_runner_impl(runner_impl)
73, m_execution_policy(runner_impl->executionPolicy())
74, m_runtime(runner_impl->runtime())
75, m_queue_stream(m_runtime->createStream(bi))
76, m_id(id)
77{
78}
79
80/*---------------------------------------------------------------------------*/
81/*---------------------------------------------------------------------------*/
82
83RunQueueImpl::
84~RunQueueImpl()
85{
86 delete m_queue_stream;
87}
88
89/*---------------------------------------------------------------------------*/
90/*---------------------------------------------------------------------------*/
91
92/*---------------------------------------------------------------------------*/
93/*---------------------------------------------------------------------------*/
94
95void RunQueueImpl::
96_freeCommandsInPool()
97{
98 bool is_check = arcaneIsCheck();
99 std::unordered_set<RunCommandImpl*> command_set;
100 while (!m_run_command_pool.empty()) {
101 RunCommandImpl* c = m_run_command_pool.top();
102 if (is_check) {
103 if (command_set.find(c) != command_set.end())
104 std::cerr << "Command is present several times in the command pool\n";
105 command_set.insert(c);
106 }
107 RunCommand::_internalDestroyImpl(c);
108 m_run_command_pool.pop();
109 }
110}
111
112/*---------------------------------------------------------------------------*/
113/*---------------------------------------------------------------------------*/
114
115void RunQueueImpl::
116_destroy(RunQueueImpl* q)
117{
118 q->_freeCommandsInPool();
119 delete q;
120}
121
122/*---------------------------------------------------------------------------*/
123/*---------------------------------------------------------------------------*/
124
125/*---------------------------------------------------------------------------*/
126/*---------------------------------------------------------------------------*/
127
128void RunQueueImpl::
129_release()
130{
131 // S'il reste des commandes en cours d'exécution au moment de libérer
132 // la file d'exécution il faut attendre pour éviter des fuites mémoire car
133 // les commandes ne seront pas désallouées.
134 // TODO: Regarder s'il ne faudrait pas plutôt indiquer cela à l'utilisateur
135 // ou faire une erreur fatale.
136 if (!m_active_run_command_list.empty()) {
137 if (!_internalStream()->_barrierNoException()) {
138 _internalFreeRunningCommands();
139 }
140 else
141 std::cerr << "WARNING: Error in internal accelerator barrier\n";
142 }
143 if (_isInPool())
144 m_runner_impl->_internalPutRunQueueImplInPool(this);
145 else {
146 RunQueueImpl::_destroy(this);
147 }
148}
149
150/*---------------------------------------------------------------------------*/
151/*---------------------------------------------------------------------------*/
152
153void RunQueueImpl::
154_setDefaultMemoryRessource()
155{
156 m_memory_ressource = eMemoryRessource::Host;
157 if (isAcceleratorPolicy(m_execution_policy))
158 m_memory_ressource = MemoryUtils::getDefaultDataMemoryResource();
159}
160
161/*---------------------------------------------------------------------------*/
162/*---------------------------------------------------------------------------*/
163
164MemoryAllocationOptions RunQueueImpl::
165allocationOptions() const
166{
167 MemoryAllocationOptions opt = MemoryUtils::getAllocationOptions(m_memory_ressource);
168 Int16 device_id = static_cast<Int16>(m_runner_impl->deviceId().asInt32());
169 opt.setDevice(device_id);
170 return opt;
171}
172
173/*---------------------------------------------------------------------------*/
174/*---------------------------------------------------------------------------*/
175
176bool RunQueueImpl::
177isAutoPrefetchCommand() const
178{
179 return m_runner_impl->isAutoPrefetchCommand();
180}
181
182/*---------------------------------------------------------------------------*/
183/*---------------------------------------------------------------------------*/
184
185void RunQueueImpl::
186copyMemory(const MemoryCopyArgs& args) const
187{
188 _internalStream()->copyMemory(args);
189}
190
191/*---------------------------------------------------------------------------*/
192/*---------------------------------------------------------------------------*/
193
194void RunQueueImpl::
195prefetchMemory(const MemoryPrefetchArgs& args) const
196{
197 _internalStream()->prefetchMemory(args);
198}
199
200/*---------------------------------------------------------------------------*/
201/*---------------------------------------------------------------------------*/
202
203void RunQueueImpl::
204recordEvent(RunQueueEvent& event)
205{
206 auto* p = event._internalEventImpl();
207 return p->recordQueue(_internalStream());
208}
209
210/*---------------------------------------------------------------------------*/
211/*---------------------------------------------------------------------------*/
212
213void RunQueueImpl::
214waitEvent(RunQueueEvent& event)
215{
216 auto* p = event._internalEventImpl();
217 return p->waitForEvent(_internalStream());
218}
219
220/*---------------------------------------------------------------------------*/
221/*---------------------------------------------------------------------------*/
222
223RunQueueImpl* RunQueueImpl::
224create(RunnerImpl* r)
225{
226 return _reset(r->_internalCreateOrGetRunQueueImpl());
227}
228
229/*---------------------------------------------------------------------------*/
230/*---------------------------------------------------------------------------*/
231
232RunQueueImpl* RunQueueImpl::
233create(RunnerImpl* r, const RunQueueBuildInfo& bi)
234{
235 return _reset(r->_internalCreateOrGetRunQueueImpl(bi));
236}
237
238/*---------------------------------------------------------------------------*/
239/*---------------------------------------------------------------------------*/
240
241RunCommandImpl* RunQueueImpl::
242_internalCreateOrGetRunCommandImpl()
243{
244 RunCommandImpl* p = nullptr;
245
246 {
247 auto& pool = m_run_command_pool;
248 Lock my_lock(this);
249 if (!pool.empty()) {
250 p = pool.top();
251 pool.pop();
252 }
253 }
254 if (!p)
255 p = RunCommand::_internalCreateImpl(this);
256 p->_reset();
257 return p;
258}
259
260/*---------------------------------------------------------------------------*/
261/*---------------------------------------------------------------------------*/
262/*!
263 * \brief Libère les commandes en cours d'exécution.
264 *
265 * Cette méthode est en général appelée après une barrière ce qui garantit
266 * que les commandes asynchrones sont terminées.
267 */
268void RunQueueImpl::
269_internalFreeRunningCommands()
270{
271 if (m_use_pool_mutex) {
272 SmallArray<RunCommandImpl*> command_list;
273 // Recopie les commandes dans un tableau local car m_active_run_command_list
274 // peut être modifié par un autre thread.
275 {
276 Lock my_lock(this);
277 for (RunCommandImpl* p : m_active_run_command_list) {
278 command_list.add(p);
279 }
280 m_active_run_command_list.clear();
281 }
282 for (RunCommandImpl* p : command_list) {
283 p->notifyEndExecuteKernel();
284 }
285 {
286 Lock my_lock(this);
287 for (RunCommandImpl* p : command_list) {
288 _checkPutCommandInPoolNoLock(p);
289 }
290 }
291 }
292 else {
293 for (RunCommandImpl* p : m_active_run_command_list) {
294 p->notifyEndExecuteKernel();
295 _checkPutCommandInPoolNoLock(p);
296 }
297 m_active_run_command_list.clear();
298 }
299}
300
301/*---------------------------------------------------------------------------*/
302/*---------------------------------------------------------------------------*/
303/*!
304 * \brief Remet la commande dans le pool si possible.
305 *
306 * On ne remet pas la commande dans le pool tant qu'il y a une RunCommand
307 * qui y fait référence. Dans ce cas la commande sera remise dans le pool
308 * lors de l'appel au destructeur de RunCommand. Cela est nécessaire pour
309 * gérer le cas où une RunCommand est créée mais n'est jamais utilisée car
310 * dans ce cas elle ne sera jamais dans m_active_run_command_list et ne
311 * sera pas traitée lors de l'appel à _internalFreeRunningCommands().
312 */
313void RunQueueImpl::
314_checkPutCommandInPoolNoLock(RunCommandImpl* p)
315{
316 if (p->m_has_living_run_command)
317 p->m_may_be_put_in_pool = true;
318 else
319 m_run_command_pool.push(p);
320}
321
322/*---------------------------------------------------------------------------*/
323/*---------------------------------------------------------------------------*/
324
325void RunQueueImpl::
326_addRunningCommand(RunCommandImpl* p)
327{
328 Lock my_lock(this);
329 m_active_run_command_list.add(p);
330}
331
332/*---------------------------------------------------------------------------*/
333/*---------------------------------------------------------------------------*/
334
335void RunQueueImpl::
336_putInCommandPool(RunCommandImpl* p)
337{
338 Lock my_lock(this);
339 m_run_command_pool.push(p);
340}
341
342/*---------------------------------------------------------------------------*/
343/*---------------------------------------------------------------------------*/
344/*!
345 * \brief Bloque jusqu'à ce que toutes les commandes soient terminées.
346 */
347void RunQueueImpl::
348_internalBarrier()
349{
350 _internalStream()->barrier();
351 _internalFreeRunningCommands();
352}
353
354/*---------------------------------------------------------------------------*/
355/*---------------------------------------------------------------------------*/
356/*!
357 * \brief Réinitialise l'implémentation
358 *
359 * Cette méthode est appelée lorsqu'on va initialiser une RunQueue avec
360 * cette instance. Il faut dans ce cas réinitialiser les valeurs de l'instance
361 * qui dépendent de l'état actuel.
362 */
363RunQueueImpl* RunQueueImpl::
364_reset(RunQueueImpl* p)
365{
366 p->m_is_async = false;
367 p->_setDefaultMemoryRessource();
368 return p;
369}
370
371/*---------------------------------------------------------------------------*/
372/*---------------------------------------------------------------------------*/
373
374void RunQueueImpl::
375setConcurrentCommandCreation(bool v)
376{
377 m_use_pool_mutex = v;
378 if (!m_pool_mutex.get())
379 m_pool_mutex = std::make_unique<std::mutex>();
380}
381
382/*---------------------------------------------------------------------------*/
383/*---------------------------------------------------------------------------*/
384
385void RunQueueImpl::
386dumpStats(std::ostream& ostr) const
387{
388 ostr << "nb_pool=" << m_run_command_pool.size()
389 << " nb_active=" << m_active_run_command_list.size() << "\n";
390}
391
392/*---------------------------------------------------------------------------*/
393/*---------------------------------------------------------------------------*/
394
395} // namespace Arcane::Accelerator::impl
396
397/*---------------------------------------------------------------------------*/
398/*---------------------------------------------------------------------------*/
Fonctions de gestion mémoire et des allocateurs.
Informations pour créer une RunQueue.
Verrou pour le pool de RunCommand en multi-thread.
bool isAcceleratorPolicy(eExecutionPolicy exec_policy)
Indique si exec_policy correspond à un accélérateur.
MemoryAllocationOptions getAllocationOptions(eMemoryResource mem_resource)
Allocation par défaut pour la ressource mem_resource.
eMemoryResource getDefaultDataMemoryResource()
Ressource mémoire utilisée par l'allocateur par défaut pour les données.
bool arcaneIsCheck()
Vrai si on est en mode vérification.
Definition Misc.cc:68
std::int16_t Int16
Type entier signé sur 16 bits.
std::int32_t Int32
Type entier signé sur 32 bits.