Arcane  4.1.12.0
Developer documentation
Loading...
Searching...
No Matches
SharedMemoryParallelDispatch.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2026 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* SharedMemoryParallelDispatch.cc (C) 2000-2025 */
9/* */
10/* Implementation of shared memory messages. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/utils/ArcanePrecomp.h"
15
16#include "arcane/utils/Array.h"
17#include "arcane/utils/PlatformUtils.h"
18#include "arcane/utils/String.h"
19#include "arcane/utils/ITraceMng.h"
20#include "arcane/utils/NumericTypes.h"
21#include "arcane/utils/APReal.h"
22#include "arcane/utils/NotImplementedException.h"
23#include "arcane/utils/MemoryView.h"
25
26#include "arcane/core/MeshVariable.h"
27#include "arcane/core/IParallelMng.h"
28#include "arcane/core/ItemGroup.h"
29#include "arcane/core/IMesh.h"
30#include "arcane/core/IBase.h"
31
32#include "arcane/parallel/thread/SharedMemoryParallelDispatch.h"
33#include "arcane/parallel/thread/SharedMemoryParallelMng.h"
34#include "arcane/parallel/thread/ISharedMemoryMessageQueue.h"
35
36#include "arccore/message_passing/PointToPointMessageInfo.h"
37
38/*---------------------------------------------------------------------------*/
39/*---------------------------------------------------------------------------*/
40
42{
43
44/*---------------------------------------------------------------------------*/
45/*---------------------------------------------------------------------------*/
46
47/*
48 * TODO: To simplify debugging when there is a timing skew of collective calls
49 * between threads, a barrier should be implemented per collective call type,
50 * whereas currently all collective calls use the same barrier (via _collectiveBarrier()).
51 * Because of this, problems can occur that are not easily detectable. For example:
52 *
53 * Thread1:
54 * allGather();
55 * barrier();
56 * allReduce();
57 * Thread2:
58 * barrier();
59 * allGather();
60 * allReduce();
61 *
62 * In this case, the code will not crash, but the collective values will not be correct.
63 */
64
65/*---------------------------------------------------------------------------*/
66/*---------------------------------------------------------------------------*/
67
68SharedMemoryParallelDispatchBase::
69SharedMemoryParallelDispatchBase(ITraceMng* tm, SharedMemoryParallelMng* parallel_mng,
70 ISharedMemoryMessageQueue* message_queue,
71 ArrayView<SharedMemoryParallelDispatchBase*> all_dispatchs_base)
72: TraceAccessor(tm)
73, m_parallel_mng(parallel_mng)
74, m_rank(parallel_mng->commRank())
75, m_nb_rank(parallel_mng->commSize())
76, m_message_queue(message_queue)
77, m_all_dispatchs_base(all_dispatchs_base)
78{
79}
80
81/*---------------------------------------------------------------------------*/
82/*---------------------------------------------------------------------------*/
83
84void SharedMemoryParallelDispatchBase::
85_collectiveBarrier()
86{
87 m_parallel_mng->getThreadBarrier()->wait();
88}
89
90/*---------------------------------------------------------------------------*/
91/*---------------------------------------------------------------------------*/
92
93void SharedMemoryParallelDispatchBase::
94_genericAllToAll(ConstMemoryView send_buf, MutableMemoryView recv_buf, Int32 count)
95{
96 Int32 nb_rank = m_nb_rank;
97
98 //TODO: Implement a version without allocation
99 Int32UniqueArray send_count(nb_rank, count);
100 Int32UniqueArray recv_count(nb_rank, count);
101
102 Int32UniqueArray send_indexes(nb_rank);
103 Int32UniqueArray recv_indexes(nb_rank);
104 for (Integer i = 0; i < nb_rank; ++i) {
105 send_indexes[i] = count * i;
106 recv_indexes[i] = count * i;
107 }
108 _genericAllToAllVariable(send_buf, send_count, send_indexes, recv_buf, recv_count, recv_indexes);
109}
110
111/*---------------------------------------------------------------------------*/
112/*---------------------------------------------------------------------------*/
113
114void SharedMemoryParallelDispatchBase::
115_genericAllToAllVariable(ConstMemoryView send_buf,
116 Span<const Int32> send_count,
117 Span<const Int32> send_index,
118 MutableMemoryView recv_buf,
119 Span<const Int32> recv_count,
120 Span<const Int32> recv_index)
121{
122 m_alltoallv_infos.send_buf = send_buf;
123 m_alltoallv_infos.send_count = send_count;
124 m_alltoallv_infos.send_index = send_index;
125 m_alltoallv_infos.recv_buf = recv_buf;
126 m_alltoallv_infos.recv_count = recv_count;
127 m_alltoallv_infos.recv_index = recv_index;
128 _collectiveBarrier();
129 Integer global_index = 0;
130 Int32 my_rank = m_rank;
131 MutableMemoryView recv_mem_buf(recv_buf);
132 for (Integer i = 0; i < m_nb_rank; ++i) {
133 AllToAllVariableInfo ainfo = m_all_dispatchs_base[i]->m_alltoallv_infos;
134 ConstMemoryView view(ainfo.send_buf);
135 Integer index = ainfo.send_index[my_rank];
136 Integer count = ainfo.send_count[my_rank];
137 MemoryUtils::copyHost(recv_mem_buf.subView(global_index, count), view.subView(index, count));
138 global_index += count;
139 }
140 _collectiveBarrier();
141}
142
143/*---------------------------------------------------------------------------*/
144/*---------------------------------------------------------------------------*/
145
146void SharedMemoryParallelDispatchBase::
147_genericAllGather(ConstMemoryView send_buf, MutableMemoryView recv_buf)
148{
149 m_const_view = send_buf;
150 _collectiveBarrier();
151 MutableMemoryView recv_mem_view(recv_buf);
152 Int64 index = 0;
153 for (Int32 i = 0; i < m_nb_rank; ++i) {
154 ConstMemoryView view(m_all_dispatchs_base[i]->m_const_view);
155 Int64 size = view.nbElement();
156 MemoryUtils::copyHost(recv_mem_view.subView(index, size), view);
157 index += size;
158 }
159 _collectiveBarrier();
160}
161
162/*---------------------------------------------------------------------------*/
163/*---------------------------------------------------------------------------*/
164
165void SharedMemoryParallelDispatchBase::
166_genericAllGatherVariable(ConstMemoryView send_buf, IResizableArray* recv_buf)
167{
168 m_const_view = send_buf;
169 _collectiveBarrier();
170 Int64 total_size = 0;
171 for (Integer i = 0; i < m_nb_rank; ++i) {
172 total_size += m_all_dispatchs_base[i]->m_const_view.nbElement();
173 }
174 recv_buf->resize(total_size);
175 MutableMemoryView recv_mem_view(recv_buf->memoryView());
176 Int64 index = 0;
177 for (Integer i = 0; i < m_nb_rank; ++i) {
178 ConstMemoryView view(m_all_dispatchs_base[i]->m_const_view);
179 Int64 size = view.nbElement();
180 MemoryUtils::copyHost(recv_mem_view.subView(index, size), view);
181 index += size;
182 }
183 _collectiveBarrier();
184}
185
186/*---------------------------------------------------------------------------*/
187/*---------------------------------------------------------------------------*/
188
189void SharedMemoryParallelDispatchBase::
190_genericScatterVariable(ConstMemoryView send_buf, MutableMemoryView recv_buf, Int32 root)
191{
192 m_const_view = send_buf;
193 m_recv_view = recv_buf;
194 _collectiveBarrier();
195 if (m_rank == root) {
196 ConstMemoryView const_view(m_const_view);
197 Int64 index = 0;
198 for (Integer i = 0; i < m_nb_rank; ++i) {
199 MutableMemoryView view(m_all_dispatchs_base[i]->m_recv_view);
200 Int64 size = view.nbElement();
201 MemoryUtils::copyHost(view, const_view.subView(index, size));
202 index += size;
203 }
204 }
205 _collectiveBarrier();
206}
207
208/*---------------------------------------------------------------------------*/
209/*---------------------------------------------------------------------------*/
210
211Request SharedMemoryParallelDispatchBase::
212_genericSend(ConstMemoryView send_buffer, const PointToPointMessageInfo& message2)
213{
214 PointToPointMessageInfo message(message2);
215 message.setEmiterRank(MessageRank(m_rank));
216 bool is_blocking = message.isBlocking();
217 if (message.isRankTag()) {
218 Request r = m_message_queue->addSend(message, SendBufferInfo(send_buffer));
219 if (is_blocking) {
220 m_message_queue->waitAll(ArrayView<Request>(1, &r));
221 return Request();
222 }
223 return r;
224 }
225 if (message.isMessageId()) {
226 // The send with a MessageId does not exist.
227 ARCCORE_THROW(NotSupportedException, "Invalid generic send with MessageId");
228 }
229 ARCCORE_THROW(NotSupportedException, "Invalid message_info");
230}
231
232/*---------------------------------------------------------------------------*/
233/*---------------------------------------------------------------------------*/
234
235Request SharedMemoryParallelDispatchBase::
236_genericReceive(MutableMemoryView recv_buffer, const PointToPointMessageInfo& message2)
237{
238 PointToPointMessageInfo message(message2);
239 bool is_blocking = message.isBlocking();
240 message.setEmiterRank(MessageRank(m_rank));
241 ReceiveBufferInfo buf{ recv_buffer };
242 Request r = m_message_queue->addReceive(message, buf);
243 if (is_blocking) {
244 m_message_queue->waitAll(ArrayView<Request>(1, &r));
245 return MP::Request();
246 }
247 return r;
248}
249
250/*---------------------------------------------------------------------------*/
251/*---------------------------------------------------------------------------*/
252
253void SharedMemoryParallelDispatchBase::
254_genericBroadcast(MutableMemoryView send_buf, Int32 rank)
255{
256 m_broadcast_view = send_buf;
257 _collectiveBarrier();
258 MemoryUtils::copyHost(m_broadcast_view, m_all_dispatchs_base[rank]->m_broadcast_view);
259 _collectiveBarrier();
260}
261
262/*---------------------------------------------------------------------------*/
263/*---------------------------------------------------------------------------*/
264
265/*---------------------------------------------------------------------------*/
266/*---------------------------------------------------------------------------*/
267
268template <class Type> SharedMemoryParallelDispatch<Type>::
269SharedMemoryParallelDispatch(ITraceMng* tm, SharedMemoryParallelMng* parallel_mng,
270 ISharedMemoryMessageQueue* message_queue,
271 impl::ShareMemoryDispatcherContainer<Type>& containers)
272: BaseClass(tm, parallel_mng, message_queue, containers.all_dispatchs_base)
273, m_all_dispatchs(containers.all_dispatchs)
274{
275 m_reduce_infos.m_index = 0;
276 m_all_dispatchs[m_rank] = this;
277 m_all_dispatchs_base[m_rank] = this;
278}
279
280/*---------------------------------------------------------------------------*/
281/*---------------------------------------------------------------------------*/
282
283template <class Type> SharedMemoryParallelDispatch<Type>::
284~SharedMemoryParallelDispatch()
285{
286 finalize();
287}
288
289/*---------------------------------------------------------------------------*/
290/*---------------------------------------------------------------------------*/
291
292template <class Type> void SharedMemoryParallelDispatch<Type>::
293finalize()
294{
295}
296
297/*---------------------------------------------------------------------------*/
298/*---------------------------------------------------------------------------*/
299
300template <typename T>
301class _ThreadIntegralType
302{
303 public:
304
305 typedef FalseType IsIntegral;
306};
307
308#define ARCANE_DEFINE_INTEGRAL_TYPE(datatype) \
309 template <> \
310 class _ThreadIntegralType<datatype> \
311 { \
312 public: \
313\
314 typedef TrueType IsIntegral; \
315 }
316
317ARCANE_DEFINE_INTEGRAL_TYPE(long long);
318ARCANE_DEFINE_INTEGRAL_TYPE(long);
319ARCANE_DEFINE_INTEGRAL_TYPE(int);
320ARCANE_DEFINE_INTEGRAL_TYPE(short);
321ARCANE_DEFINE_INTEGRAL_TYPE(unsigned long long);
322ARCANE_DEFINE_INTEGRAL_TYPE(unsigned long);
323ARCANE_DEFINE_INTEGRAL_TYPE(unsigned int);
324ARCANE_DEFINE_INTEGRAL_TYPE(unsigned short);
325ARCANE_DEFINE_INTEGRAL_TYPE(double);
326ARCANE_DEFINE_INTEGRAL_TYPE(float);
327ARCANE_DEFINE_INTEGRAL_TYPE(HPReal);
328
329/*---------------------------------------------------------------------------*/
330/*---------------------------------------------------------------------------*/
331
332namespace
333{
334
335 template <class Type> void
336 _computeMinMaxSum2(ArrayView<SharedMemoryParallelDispatch<Type>*> all_dispatchs,
337 Type& min_val, Type& max_val, Type& sum_val,
338 Int32& min_rank, Int32& max_rank, Int32 nb_rank, FalseType)
339 {
340 ARCANE_UNUSED(all_dispatchs);
341 ARCANE_UNUSED(min_val);
342 ARCANE_UNUSED(max_val);
343 ARCANE_UNUSED(sum_val);
344 ARCANE_UNUSED(min_rank);
345 ARCANE_UNUSED(max_rank);
346 ARCANE_UNUSED(nb_rank);
347
348 throw NotImplementedException(A_FUNCINFO);
349 }
350
351 /*---------------------------------------------------------------------------*/
352 /*---------------------------------------------------------------------------*/
353
354 template <class Type> void
355 _computeMinMaxSum2(ArrayView<SharedMemoryParallelDispatch<Type>*> all_dispatchs,
356 Type& min_val, Type& max_val, Type& sum_val,
357 Int32& min_rank, Int32& max_rank, Int32 nb_rank, TrueType)
358 {
359 Type _min_val = all_dispatchs[0]->m_reduce_infos.reduce_value;
360 Type _max_val = _min_val;
361 Type _sum_val = _min_val;
362 Integer _min_rank = 0;
363 Integer _max_rank = 0;
364 for (Integer i = 1; i < nb_rank; ++i) {
365 Type cval = all_dispatchs[i]->m_reduce_infos.reduce_value;
366 if (cval < _min_val) {
367 _min_val = cval;
368 _min_rank = i;
369 }
370 if (cval > _max_val) {
371 _max_val = cval;
372 _max_rank = i;
373 }
374 _sum_val = (Type)(_sum_val + cval);
375 }
376 min_val = _min_val;
377 max_val = _max_val;
378 sum_val = _sum_val;
379 min_rank = _min_rank;
380 max_rank = _max_rank;
381 }
382
383} // namespace
384
385/*---------------------------------------------------------------------------*/
386/*---------------------------------------------------------------------------*/
387
388template <class Type> void SharedMemoryParallelDispatch<Type>::
389computeMinMaxSum(Type val, Type& min_val, Type& max_val, Type& sum_val,
390 Int32& min_rank, Int32& max_rank)
391{
392 typedef typename _ThreadIntegralType<Type>::IsIntegral IntegralType;
393 m_reduce_infos.reduce_value = val;
394 _collectiveBarrier();
395 _computeMinMaxSum2(m_all_dispatchs, min_val, max_val, sum_val, min_rank, max_rank, m_nb_rank, IntegralType());
396 _collectiveBarrier();
397}
398
399/*---------------------------------------------------------------------------*/
400/*---------------------------------------------------------------------------*/
401
402template <class Type> void SharedMemoryParallelDispatch<Type>::
403computeMinMaxSum(ConstArrayView<Type> values,
404 ArrayView<Type> min_values,
405 ArrayView<Type> max_values,
406 ArrayView<Type> sum_values,
407 ArrayView<Int32> min_ranks,
408 ArrayView<Int32> max_ranks)
409{
410 // Suboptimal implementation that does not vectorize the calculation
411 // (it is currently a copy-paste from above put into a loop)
412 typedef typename _ThreadIntegralType<Type>::IsIntegral IntegralType;
413 Integer n = values.size();
414 for (Integer i = 0; i < n; ++i) {
415 m_reduce_infos.reduce_value = values[i];
416 _collectiveBarrier();
417 _computeMinMaxSum2(m_all_dispatchs, min_values[i], max_values[i], sum_values[i],
418 min_ranks[i], max_ranks[i], m_nb_rank, IntegralType());
419 _collectiveBarrier();
420 }
421}
422
423/*---------------------------------------------------------------------------*/
424/*---------------------------------------------------------------------------*/
425
426template <class Type> void SharedMemoryParallelDispatch<Type>::
427broadcast(Span<Type> send_buf, Int32 rank)
428{
429 _genericBroadcast(MutableMemoryView(send_buf), rank);
430}
431
432/*---------------------------------------------------------------------------*/
433/*---------------------------------------------------------------------------*/
434
435template <class Type> void SharedMemoryParallelDispatch<Type>::
436allGather(Span<const Type> send_buf, Span<Type> recv_buf)
437{
438 _genericAllGather(ConstMemoryView{ send_buf }, MutableMemoryView{ recv_buf });
439}
440
441/*---------------------------------------------------------------------------*/
442/*---------------------------------------------------------------------------*/
443
444template <class Type> void SharedMemoryParallelDispatch<Type>::
445gather(Span<const Type> send_buf, Span<Type> recv_buf, Int32 root_rank)
446{
447 UniqueArray<Type> tmp_buf;
448 if (m_rank == root_rank)
449 allGather(send_buf, recv_buf);
450 else {
451 tmp_buf.resize(send_buf.size() * m_nb_rank);
452 allGather(send_buf, tmp_buf);
453 }
454}
455
456/*---------------------------------------------------------------------------*/
457/*---------------------------------------------------------------------------*/
458
459template <class Type> void SharedMemoryParallelDispatch<Type>::
460allGatherVariable(Span<const Type> send_buf, Array<Type>& recv_buf)
461{
462 ResizableArrayRef recv_buf_ref(recv_buf);
463 _genericAllGatherVariable(ConstMemoryView(send_buf), &recv_buf_ref);
464}
465
466/*---------------------------------------------------------------------------*/
467/*---------------------------------------------------------------------------*/
468
469template <class Type> void SharedMemoryParallelDispatch<Type>::
470gatherVariable(Span<const Type> send_buf, Array<Type>& recv_buf, Int32 root_rank)
471{
472 UniqueArray<Type> tmp_buf;
473 if (m_rank == root_rank)
474 allGatherVariable(send_buf, recv_buf);
475 else
476 allGatherVariable(send_buf, tmp_buf);
477}
478
479/*---------------------------------------------------------------------------*/
480/*---------------------------------------------------------------------------*/
481
482template <class Type> void SharedMemoryParallelDispatch<Type>::
483scatterVariable(Span<const Type> send_buf, Span<Type> recv_buf, Int32 root)
484{
485 _genericScatterVariable(ConstMemoryView(send_buf), MutableMemoryView(recv_buf), root);
486}
487
488/*---------------------------------------------------------------------------*/
489/*---------------------------------------------------------------------------*/
490
491template <class Type> void SharedMemoryParallelDispatch<Type>::
492allToAll(Span<const Type> send_buf, Span<Type> recv_buf, Int32 count)
493{
494 _genericAllToAll(ConstMemoryView(send_buf), MutableMemoryView(recv_buf), count);
495}
496
497/*---------------------------------------------------------------------------*/
498/*---------------------------------------------------------------------------*/
499
500template <class Type> void SharedMemoryParallelDispatch<Type>::
501allToAllVariable(Span<const Type> send_buf, ConstArrayView<Int32> send_count,
502 ConstArrayView<Int32> send_index,
503 Span<Type> recv_buf, ConstArrayView<Int32> recv_count,
504 Int32ConstArrayView recv_index)
505{
506 _genericAllToAllVariable(ConstMemoryView(send_buf), send_count, send_index,
507 MutableMemoryView(recv_buf), recv_count, recv_index);
508}
509
510/*---------------------------------------------------------------------------*/
511/*---------------------------------------------------------------------------*/
512
513template <class Type> auto SharedMemoryParallelDispatch<Type>::
514send(Span<const Type> send_buffer, Int32 rank, bool is_blocking) -> Request
515{
516 auto block_mode = (is_blocking) ? Parallel::Blocking : Parallel::NonBlocking;
517 auto p2p_message = m_parallel_mng->buildMessage(rank, block_mode);
518 return send(send_buffer, p2p_message);
519}
520
521/*---------------------------------------------------------------------------*/
522/*---------------------------------------------------------------------------*/
523
524template <class Type> void SharedMemoryParallelDispatch<Type>::
525send(ConstArrayView<Type> send_buf, Int32 rank)
526{
527 send(send_buf, rank, true);
528}
529
530/*---------------------------------------------------------------------------*/
531/*---------------------------------------------------------------------------*/
532
533template <class Type> Parallel::Request SharedMemoryParallelDispatch<Type>::
534receive(Span<Type> recv_buffer, Int32 rank, bool is_blocking)
535{
536 auto block_mode = (is_blocking) ? Parallel::Blocking : Parallel::NonBlocking;
537 auto p2p_message = m_parallel_mng->buildMessage(rank, block_mode);
538 return receive(recv_buffer, p2p_message);
539}
540
541/*---------------------------------------------------------------------------*/
542/*---------------------------------------------------------------------------*/
543
544template <class Type> Request SharedMemoryParallelDispatch<Type>::
545send(Span<const Type> send_buffer, const PointToPointMessageInfo& message2)
546{
547 return _genericSend(ConstMemoryView(send_buffer), message2);
548}
549
550/*---------------------------------------------------------------------------*/
551/*---------------------------------------------------------------------------*/
552
553template <class Type> Request SharedMemoryParallelDispatch<Type>::
554receive(Span<Type> recv_buffer, const PointToPointMessageInfo& message2)
555{
556 return _genericReceive(MutableMemoryView(recv_buffer), message2);
557}
558
559/*---------------------------------------------------------------------------*/
560/*---------------------------------------------------------------------------*/
561
562template <class Type> void SharedMemoryParallelDispatch<Type>::
563recv(ArrayView<Type> recv_buffer, Integer rank)
564{
565 recv(recv_buffer, rank, true);
566}
567
568/*---------------------------------------------------------------------------*/
569/*---------------------------------------------------------------------------*/
570
571template <class Type> void SharedMemoryParallelDispatch<Type>::
572sendRecv(ConstArrayView<Type> send_buffer, ArrayView<Type> recv_buffer, Integer proc)
573{
574 ARCANE_UNUSED(send_buffer);
575 ARCANE_UNUSED(recv_buffer);
576 ARCANE_UNUSED(proc);
577 throw NotImplementedException(A_FUNCINFO);
578}
579
580/*---------------------------------------------------------------------------*/
581/*---------------------------------------------------------------------------*/
582
583template <class Type> Type SharedMemoryParallelDispatch<Type>::
584allReduce(eReduceType op, Type send_buf)
585{
586 m_reduce_infos.reduce_value = send_buf;
587 //cout << "ALL REDUCE BEGIN RANk=" << m_rank << " TYPE=" << (int)op << " MY=" << send_buf << '\n';
588 cout.flush();
589 _collectiveBarrier();
590 Type ret = m_all_dispatchs[0]->m_reduce_infos.reduce_value;
591 switch (op) {
592 case Parallel::ReduceMin:
593 for (Integer i = 1; i < m_nb_rank; ++i)
594 ret = math::min(ret, m_all_dispatchs[i]->m_reduce_infos.reduce_value);
595 break;
596 case Parallel::ReduceMax:
597 for (Integer i = 1; i < m_nb_rank; ++i)
598 ret = math::max(ret, m_all_dispatchs[i]->m_reduce_infos.reduce_value);
599 break;
600 case Parallel::ReduceSum:
601 for (Integer i = 1; i < m_nb_rank; ++i)
602 ret = (Type)(ret + m_all_dispatchs[i]->m_reduce_infos.reduce_value);
603 break;
604 default:
605 ARCANE_FATAL("Bad reduce type {0}", (int)op);
606 }
607 //cout << "ALL REDUCE RANK=" << m_rank << " TYPE=" << (int)op << " MY=" << send_buf << " GLOBAL=" << ret << '\n';
608 _collectiveBarrier();
609 return ret;
610}
611
612/*---------------------------------------------------------------------------*/
613/*---------------------------------------------------------------------------*/
614
615template <class Type> void SharedMemoryParallelDispatch<Type>::
616_allReduceOrScan(eReduceType op, Span<Type> send_buf, bool is_scan)
617{
618 m_reduce_infos.reduce_buf = send_buf;
619 ++m_reduce_infos.m_index;
620 Int64 buf_size = send_buf.size();
621 UniqueArray<Type> ret(buf_size);
622 //cout << "ALL REDUCE BEGIN RANk=" << m_rank << " TYPE=" << (int)op << " MY=" << send_buf << '\n';
623 //cout.flush();
624 _collectiveBarrier();
625 {
626 Integer index0 = m_all_dispatchs[0]->m_reduce_infos.m_index;
627 for (Integer i = 0; i < m_nb_rank; ++i) {
628 Integer indexi = m_all_dispatchs[i]->m_reduce_infos.m_index;
629 if (index0 != m_all_dispatchs[i]->m_reduce_infos.m_index) {
630 ARCANE_FATAL("INTERNAL: incoherent all reduce i0={0} in={1} n={2}",
631 index0, indexi, i);
632 }
633 }
634 }
635 Int32 nb_rank = m_nb_rank;
636 if (is_scan)
637 nb_rank = m_rank + 1;
638 for (Integer j = 0; j < buf_size; ++j)
639 ret[j] = m_all_dispatchs[0]->m_reduce_infos.reduce_buf[j];
640 switch (op) {
641 case Parallel::ReduceMin:
642 for (Integer i = 1; i < nb_rank; ++i)
643 for (Integer j = 0; j < buf_size; ++j)
644 ret[j] = math::min(ret[j], m_all_dispatchs[i]->m_reduce_infos.reduce_buf[j]);
645 break;
646 case Parallel::ReduceMax:
647 for (Integer i = 1; i < nb_rank; ++i)
648 for (Integer j = 0; j < buf_size; ++j)
649 ret[j] = math::max(ret[j], m_all_dispatchs[i]->m_reduce_infos.reduce_buf[j]);
650 break;
651 case Parallel::ReduceSum:
652 for (Integer i = 1; i < nb_rank; ++i)
653 for (Integer j = 0; j < buf_size; ++j)
654 ret[j] = (Type)(ret[j] + m_all_dispatchs[i]->m_reduce_infos.reduce_buf[j]);
655 break;
656 default:
657 ARCANE_FATAL("Bad reduce type");
658 }
659 //cout << "ALL REDUCE RANK=" << m_rank << " TYPE=" << (int)op << " MY=" << send_buf << " GLOBAL=" << ret << '\n';
660 _collectiveBarrier();
661 for (Integer j = 0; j < buf_size; ++j)
662 send_buf[j] = ret[j];
663}
664
665/*---------------------------------------------------------------------------*/
666/*---------------------------------------------------------------------------*/
667
668template <class Type> void SharedMemoryParallelDispatch<Type>::
669allReduce(eReduceType op, Span<Type> send_buf)
670{
671 _allReduceOrScan(op, send_buf, false);
672}
673
674/*---------------------------------------------------------------------------*/
675/*---------------------------------------------------------------------------*/
676
677template <class Type> Request SharedMemoryParallelDispatch<Type>::
678nonBlockingAllReduce(eReduceType op, Span<const Type> send_buf, Span<Type> recv_buf)
679{
680 ARCANE_UNUSED(op);
681 ARCANE_UNUSED(send_buf);
682 ARCANE_UNUSED(recv_buf);
683 throw NotImplementedException(A_FUNCINFO);
684}
685
686/*---------------------------------------------------------------------------*/
687/*---------------------------------------------------------------------------*/
688
689template <class Type> Request SharedMemoryParallelDispatch<Type>::
690nonBlockingAllGather(Span<const Type> send_buf, Span<Type> recv_buf)
691{
692 ARCANE_UNUSED(send_buf);
693 ARCANE_UNUSED(recv_buf);
694 throw NotImplementedException(A_FUNCINFO);
695}
696
697/*---------------------------------------------------------------------------*/
698/*---------------------------------------------------------------------------*/
699
700template <class Type> Request SharedMemoryParallelDispatch<Type>::
701nonBlockingBroadcast(Span<Type> send_buf, Int32 rank)
702{
703 ARCANE_UNUSED(send_buf);
704 ARCANE_UNUSED(rank);
705 throw NotImplementedException(A_FUNCINFO);
706}
707
708/*---------------------------------------------------------------------------*/
709/*---------------------------------------------------------------------------*/
710
711template <class Type> Request SharedMemoryParallelDispatch<Type>::
712nonBlockingGather(Span<const Type> send_buf, Span<Type> recv_buf, Int32 rank)
713{
714 ARCANE_UNUSED(send_buf);
715 ARCANE_UNUSED(recv_buf);
716 ARCANE_UNUSED(rank);
717 throw NotImplementedException(A_FUNCINFO);
718}
719
720/*---------------------------------------------------------------------------*/
721/*---------------------------------------------------------------------------*/
722
723template <class Type> Request SharedMemoryParallelDispatch<Type>::
724nonBlockingAllToAll(Span<const Type> send_buf, Span<Type> recv_buf, Int32 count)
725{
726 ARCANE_UNUSED(send_buf);
727 ARCANE_UNUSED(recv_buf);
728 ARCANE_UNUSED(count);
729 throw NotImplementedException(A_FUNCINFO);
730}
731
732/*---------------------------------------------------------------------------*/
733/*---------------------------------------------------------------------------*/
734
735template <class Type> Request SharedMemoryParallelDispatch<Type>::
736nonBlockingAllToAllVariable(Span<const Type> send_buf, ConstArrayView<Int32> send_count,
737 ConstArrayView<Int32> send_index, Span<Type> recv_buf,
738 ConstArrayView<Int32> recv_count, ConstArrayView<Int32> recv_index)
739{
740 ARCANE_UNUSED(send_buf);
741 ARCANE_UNUSED(recv_buf);
742 ARCANE_UNUSED(send_count);
743 ARCANE_UNUSED(recv_count);
744 ARCANE_UNUSED(send_index);
745 ARCANE_UNUSED(recv_index);
746 throw NotImplementedException(A_FUNCINFO);
747}
748
749/*---------------------------------------------------------------------------*/
750/*---------------------------------------------------------------------------*/
751
752template <class Type> Type SharedMemoryParallelDispatch<Type>::
753scan(eReduceType op, Type send_buf)
754{
755 ARCANE_UNUSED(op);
756 ARCANE_UNUSED(send_buf);
757 throw NotImplementedException(A_FUNCINFO);
758}
759
760/*---------------------------------------------------------------------------*/
761/*---------------------------------------------------------------------------*/
762
763template <class Type> void SharedMemoryParallelDispatch<Type>::
764scan(eReduceType op, ArrayView<Type> send_buf)
765{
766 _allReduceOrScan(op, send_buf, true);
767}
768
769/*---------------------------------------------------------------------------*/
770/*---------------------------------------------------------------------------*/
771
772template <class Type> void SharedMemoryParallelDispatch<Type>::
773waitAll()
774{
775 // TEMPORARY: not used for now
776 throw NotImplementedException(A_FUNCINFO);
777}
778
779/*---------------------------------------------------------------------------*/
780/*---------------------------------------------------------------------------*/
781
782template <class Type> Request SharedMemoryParallelDispatch<Type>::
784{
785 throw NotImplementedException(A_FUNCINFO);
786}
787
788/*---------------------------------------------------------------------------*/
789/*---------------------------------------------------------------------------*/
790
791template class SharedMemoryParallelDispatch<char>;
792template class SharedMemoryParallelDispatch<signed char>;
793template class SharedMemoryParallelDispatch<unsigned char>;
794template class SharedMemoryParallelDispatch<short>;
795template class SharedMemoryParallelDispatch<unsigned short>;
796template class SharedMemoryParallelDispatch<int>;
797template class SharedMemoryParallelDispatch<unsigned int>;
798template class SharedMemoryParallelDispatch<long>;
799template class SharedMemoryParallelDispatch<unsigned long>;
800template class SharedMemoryParallelDispatch<long long>;
801template class SharedMemoryParallelDispatch<unsigned long long>;
802template class SharedMemoryParallelDispatch<float>;
803template class SharedMemoryParallelDispatch<double>;
804template class SharedMemoryParallelDispatch<long double>;
805template class SharedMemoryParallelDispatch<APReal>;
806template class SharedMemoryParallelDispatch<Real2>;
807template class SharedMemoryParallelDispatch<Real3>;
808template class SharedMemoryParallelDispatch<Real2x2>;
809template class SharedMemoryParallelDispatch<Real3x3>;
810template class SharedMemoryParallelDispatch<HPReal>;
811
812/*---------------------------------------------------------------------------*/
813/*---------------------------------------------------------------------------*/
814
815} // End namespace Arcane::MessagePassing
816
817/*---------------------------------------------------------------------------*/
818/*---------------------------------------------------------------------------*/
#define ARCANE_FATAL(...)
Macro throwing a FatalErrorException.
#define ARCCORE_THROW(exception_class,...)
Macro to throw an exception with formatting.
Memory and allocator management functions.
Brief information for a 'gather' message for data type DataType.
Declarations of types and methods used by message exchange mechanisms.
Int32 Integer
Type representing an integer.
UniqueArray< Int32 > Int32UniqueArray
Dynamic 1D array of 32-bit integers.
Definition UtilsTypes.h:341
Type
Type of JSON value.
Definition rapidjson.h:730