Arcane  4.1.12.0
Developer documentation
Loading...
Searching...
No Matches
AsyncQueue.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2026 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* AsyncQueue.cc (C) 2000-2024 */
9/* */
10/* Implementation of a shared memory message queue. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/parallel/thread/IAsyncQueue.h"
15
16#include "arcane/utils/PlatformUtils.h"
17#include "arcane/utils/ValueConvert.h"
18
19#include "arcane/parallel/thread/ArcaneThreadMisc.h"
20
21#include <queue>
22#include <mutex>
23#include <condition_variable>
24
25#include "arcane_packages.h"
26
27#ifdef ARCANE_HAS_PACKAGE_TBB
28#include <tbb/concurrent_queue.h>
29#endif
30
31/*---------------------------------------------------------------------------*/
32/*---------------------------------------------------------------------------*/
33
35{
36
37/*---------------------------------------------------------------------------*/
38/*---------------------------------------------------------------------------*/
39
46: public IAsyncQueue
47{
48 public:
49
50 void push(void* v) override
51 {
52 std::unique_lock<std::mutex> lg(m_mutex);
53 m_shared_queue.push(v);
54 // NOTE: normally it is not necessary to have the lock active
55 // when calling 'notify_one()' but this generates warnings
56 // with helgrind (valgrind). So we leave the lock to avoid this.
57 // It should be checked if this has any performance effects (in
58 // Arcane CI tests, this is not the case).
59 m_conditional_variable.notify_one();
60 }
61 void* pop() override
62 {
63 std::unique_lock<std::mutex> lg(m_mutex);
64 while (m_shared_queue.empty()) {
65 m_conditional_variable.wait(lg);
66 }
67 void* v = m_shared_queue.front();
68 m_shared_queue.pop();
69 return v;
70 }
71 void* tryPop() override
72 {
73 std::unique_lock<std::mutex> lg(m_mutex);
74 if (m_shared_queue.empty())
75 return nullptr;
76 void* p = m_shared_queue.front();
77 m_shared_queue.pop();
78 return p;
79 }
80
81 private:
82
83 std::queue<void*> m_shared_queue;
84 std::mutex m_mutex;
85 std::condition_variable m_conditional_variable;
86};
87
88/*---------------------------------------------------------------------------*/
89/*---------------------------------------------------------------------------*/
90
91#ifdef ARCANE_HAS_PACKAGE_TBB
92class TBBAsyncQueue
93: public IAsyncQueue
94{
95 public:
96
97 void push(void* v)
98 {
99 m_shared_queue.push(v);
100 }
101 void* pop()
102 {
103 void* v = 0;
104 int count = 1;
105 while (!m_shared_queue.try_pop(v)) {
106 arcaneDoCPUPause(count);
107 if (count < 100)
108 count *= 2;
109 }
110 return v;
111 }
112 void* tryPop()
113 {
114 void* v = 0;
115 m_shared_queue.try_pop(v);
116 return v;
117 }
118
119 private:
120
121 tbb::concurrent_queue<void*> m_shared_queue;
122};
123#endif
124
125/*---------------------------------------------------------------------------*/
126/*---------------------------------------------------------------------------*/
127
128IAsyncQueue* IAsyncQueue::
129createQueue()
130{
131 // By default, active waiting is not used because there is no
132 // notable difference in performance and it avoids contention when the number
133 // of available cores is less than the number of threads.
134 [[maybe_unused]] bool use_active_queue = false;
135 if (auto v = Convert::Type<Int32>::tryParseFromEnvironment("ARCANE_USE_ACTIVE_SHM_QUEUE", true))
136 use_active_queue = (v.value() != 0);
137 IAsyncQueue* queue = nullptr;
138#ifdef ARCANE_HAS_PACKAGE_TBB
139 if (use_active_queue)
140 queue = new TBBAsyncQueue();
141#endif
142 if (!queue)
143 queue = new SharedMemoryBasicAsyncQueue();
144 return queue;
145}
146
147/*---------------------------------------------------------------------------*/
148/*---------------------------------------------------------------------------*/
149
150} // End namespace Arcane::MessagePassing
151
152/*---------------------------------------------------------------------------*/
153/*---------------------------------------------------------------------------*/
static std::optional< Int32 > tryParseFromEnvironment(StringView s, bool throw_if_invalid)
Asynchronous queue allowing the exchange of information between threads.
Definition IAsyncQueue.h:33
virtual void push(void *v)=0
Adds v to the queue.
Basic implementation of a multi-threaded queue.
Definition AsyncQueue.cc:47
void push(void *v) override
Adds v to the queue.
Definition AsyncQueue.cc:50
void * tryPop() override
Retrieves the first value if available. Returns nullptr otherwise.
Definition AsyncQueue.cc:71
void * pop() override
Retrieves the first value from the queue and blocks if there are none.
Definition AsyncQueue.cc:61
Declarations of types and methods used by message exchange mechanisms.