Arcane  v3.15.0.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
AsyncQueue.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2024 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* AsyncQueue.cc (C) 2000-2024 */
9/* */
10/* Implémentation d'une file de messages en mémoire partagée. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/parallel/thread/IAsyncQueue.h"
15
16#include "arcane/utils/PlatformUtils.h"
17#include "arcane/utils/ValueConvert.h"
18
19#include "arcane/parallel/thread/ArcaneThreadMisc.h"
20
21#include <queue>
22#include <mutex>
23#include <condition_variable>
24
25#include "arcane_packages.h"
26
27#ifdef ARCANE_HAS_PACKAGE_TBB
28#include <tbb/concurrent_queue.h>
29#endif
30
31/*---------------------------------------------------------------------------*/
32/*---------------------------------------------------------------------------*/
33
35{
36
37/*---------------------------------------------------------------------------*/
38/*---------------------------------------------------------------------------*/
45: public IAsyncQueue
46{
47 public:
48
49 void push(void* v) override
50 {
51 std::unique_lock<std::mutex> lg(m_mutex);
52 m_shared_queue.push(v);
53 // NOTE: normalement il n'y a pas besoin d'avoir le verrou actif
54 // lors de l'appel à 'notify_one()' mais cela génère des avertissements
55 // avec helgrind (valgrind). Du coup on laisse le verrou pour éviter cela.
56 // Il faudrait vérifier si cela à des effets sur les performances (dans
57 // les tests Arcane du CI ce n'est pas le cas).
58 m_conditional_variable.notify_one();
59 }
60 void* pop() override
61 {
62 std::unique_lock<std::mutex> lg(m_mutex);
63 while (m_shared_queue.empty()) {
64 m_conditional_variable.wait(lg);
65 }
66 void* v = m_shared_queue.front();
67 m_shared_queue.pop();
68 return v;
69 }
70 void* tryPop() override
71 {
72 std::unique_lock<std::mutex> lg(m_mutex);
73 if (m_shared_queue.empty())
74 return nullptr;
75 void* p = m_shared_queue.front();
76 m_shared_queue.pop();
77 return p;
78 }
79
80 private:
81
82 std::queue<void*> m_shared_queue;
83 std::mutex m_mutex;
84 std::condition_variable m_conditional_variable;
85};
86
87/*---------------------------------------------------------------------------*/
88/*---------------------------------------------------------------------------*/
89
90#ifdef ARCANE_HAS_PACKAGE_TBB
91class TBBAsyncQueue
92: public IAsyncQueue
93{
94 public:
95
96 void push(void* v)
97 {
98 m_shared_queue.push(v);
99 }
100 void* pop()
101 {
102 void* v = 0;
103 int count = 1;
104 while (!m_shared_queue.try_pop(v)) {
105 arcaneDoCPUPause(count);
106 if (count < 100)
107 count *= 2;
108 }
109 return v;
110 }
111 void* tryPop()
112 {
113 void* v = 0;
114 m_shared_queue.try_pop(v);
115 return v;
116 }
117
118 private:
119
120 tbb::concurrent_queue<void*> m_shared_queue;
121};
122#endif
123
124/*---------------------------------------------------------------------------*/
125/*---------------------------------------------------------------------------*/
126
127IAsyncQueue* IAsyncQueue::
128createQueue()
129{
130 // Par défaut n'utilise pas l'attente active car il n'y a pas de différence
131 // notable de performance et cela évite des contentions lorsque le nombre
132 // de coeurs disponibles est inférieure au nombre de threads.
133 [[maybe_unused]] bool use_active_queue = false;
134 if (auto v = Convert::Type<Int32>::tryParseFromEnvironment("ARCANE_USE_ACTIVE_SHM_QUEUE", true))
135 use_active_queue = (v.value() != 0);
136 IAsyncQueue* queue = nullptr;
137#ifdef ARCANE_HAS_PACKAGE_TBB
138 if (use_active_queue)
139 queue = new TBBAsyncQueue();
140#endif
141 if (!queue)
142 queue = new SharedMemoryBasicAsyncQueue();
143 return queue;
144}
145
146/*---------------------------------------------------------------------------*/
147/*---------------------------------------------------------------------------*/
148
149/*---------------------------------------------------------------------------*/
150/*---------------------------------------------------------------------------*/
151
152} // End namespace Arcane::MessagePassing
153
154/*---------------------------------------------------------------------------*/
155/*---------------------------------------------------------------------------*/
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Definition Lima.cc:149
File asynchrone permettant d'échanger des informations entre threads.
Definition IAsyncQueue.h:32
Implémentation basique d'une file multi-thread.
Definition AsyncQueue.cc:46
void push(void *v) override
Ajoute v dans la file.
Definition AsyncQueue.cc:49
void * tryPop() override
Récupère la première valeur s'il y en. Retourne nullptr sinon.
Definition AsyncQueue.cc:70
void * pop() override
Récupère la première valeur de la file et bloque s'il n'y en a pas.
Definition AsyncQueue.cc:60
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
Definition Parallel.h:94