Arcane  v3.16.7.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
SharedMemoryParallelDispatch.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* SharedMemoryParallelDispatch.cc (C) 2000-2025 */
9/* */
10/* Implémentation des messages en mémoire partagée. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/utils/ArcanePrecomp.h"
15
16#include "arcane/utils/Array.h"
17#include "arcane/utils/PlatformUtils.h"
18#include "arcane/utils/String.h"
19#include "arcane/utils/ITraceMng.h"
20#include "arcane/utils/NumericTypes.h"
21#include "arcane/utils/APReal.h"
22#include "arcane/utils/NotImplementedException.h"
23#include "arcane/utils/MemoryView.h"
25
26#include "arcane/MeshVariable.h"
27#include "arcane/IParallelMng.h"
28#include "arcane/ItemGroup.h"
29#include "arcane/IMesh.h"
30#include "arcane/IBase.h"
31
32#include "arcane/parallel/thread/SharedMemoryParallelDispatch.h"
33#include "arcane/parallel/thread/SharedMemoryParallelMng.h"
34#include "arcane/parallel/thread/ISharedMemoryMessageQueue.h"
35
36#include "arccore/message_passing/PointToPointMessageInfo.h"
37
38/*---------------------------------------------------------------------------*/
39/*---------------------------------------------------------------------------*/
40
42{
43
44/*---------------------------------------------------------------------------*/
45/*---------------------------------------------------------------------------*/
46
47/*
48 * TODO: pour simplifier le debug lorsqu'il y a un décalage des appels
49 * collectifs entre les threads, il faudrait faire un type de barrière
50 * par type d'appel collectif alors qu'actuellement tous les appels
51 * collectifs utilisent la même barrière (via _collectiveBarrier()).
52 * A cause de cela, des problèmes peuvent survenir qui ne sont pas
53 * facilement détectable. Par exemple:
54 *
55 * Thread1:
56 * allGather();
57 * barrier();
58 * allReduce();
59 * Thread2:
60 * barrier();
61 * allGather();
62 * allReduce();
63 *
64 * Dans ce cas, le code ne plantera pas mais les valeurs des collectives ne
65 * seront pas bonnes.
66 */
67/*---------------------------------------------------------------------------*/
68/*---------------------------------------------------------------------------*/
69
70SharedMemoryParallelDispatchBase::
71SharedMemoryParallelDispatchBase(ITraceMng* tm,SharedMemoryParallelMng* parallel_mng,
72 ISharedMemoryMessageQueue* message_queue,
73 ArrayView<SharedMemoryParallelDispatchBase*> all_dispatchs_base)
74: TraceAccessor(tm)
75, m_parallel_mng(parallel_mng)
76, m_rank(parallel_mng->commRank())
77, m_nb_rank(parallel_mng->commSize())
78, m_message_queue(message_queue)
79, m_all_dispatchs_base(all_dispatchs_base)
80{
81}
82
83/*---------------------------------------------------------------------------*/
84/*---------------------------------------------------------------------------*/
85
86void SharedMemoryParallelDispatchBase::
87_collectiveBarrier()
88{
89 m_parallel_mng->getThreadBarrier()->wait();
90}
91
92/*---------------------------------------------------------------------------*/
93/*---------------------------------------------------------------------------*/
94
95void SharedMemoryParallelDispatchBase::
96_genericAllToAll(ConstMemoryView send_buf,MutableMemoryView recv_buf,Int32 count)
97{
98 Int32 nb_rank = m_nb_rank;
99
100 //TODO: Faire une version sans allocation
101 Int32UniqueArray send_count(nb_rank,count);
102 Int32UniqueArray recv_count(nb_rank,count);
103
104 Int32UniqueArray send_indexes(nb_rank);
105 Int32UniqueArray recv_indexes(nb_rank);
106 for( Integer i=0; i<nb_rank; ++i ){
107 send_indexes[i] = count * i;
108 recv_indexes[i] = count * i;
109 }
110 _genericAllToAllVariable(send_buf,send_count,send_indexes,recv_buf,recv_count,recv_indexes);
111}
112
113/*---------------------------------------------------------------------------*/
114/*---------------------------------------------------------------------------*/
115
116void SharedMemoryParallelDispatchBase::
117_genericAllToAllVariable(ConstMemoryView send_buf,
118 Span<const Int32> send_count,
119 Span<const Int32> send_index,
120 MutableMemoryView recv_buf,
121 Span<const Int32> recv_count,
122 Span<const Int32> recv_index
123 )
124{
125 m_alltoallv_infos.send_buf = send_buf;
126 m_alltoallv_infos.send_count = send_count;
127 m_alltoallv_infos.send_index = send_index;
128 m_alltoallv_infos.recv_buf = recv_buf;
129 m_alltoallv_infos.recv_count = recv_count;
130 m_alltoallv_infos.recv_index = recv_index;
131 _collectiveBarrier();
132 Integer global_index = 0;
133 Int32 my_rank = m_rank;
134 MutableMemoryView recv_mem_buf(recv_buf);
135 for( Integer i=0; i<m_nb_rank; ++i ){
136 AllToAllVariableInfo ainfo = m_all_dispatchs_base[i]->m_alltoallv_infos;
137 ConstMemoryView view(ainfo.send_buf);
138 Integer index = ainfo.send_index[my_rank];
139 Integer count = ainfo.send_count[my_rank];
140 MemoryUtils::copyHost(recv_mem_buf.subView(global_index,count), view.subView(index,count));
141 global_index += count;
142 }
143 _collectiveBarrier();
144}
145
146/*---------------------------------------------------------------------------*/
147/*---------------------------------------------------------------------------*/
148
149void SharedMemoryParallelDispatchBase::
150_genericAllGather(ConstMemoryView send_buf,MutableMemoryView recv_buf)
151{
152 m_const_view = send_buf;
153 _collectiveBarrier();
154 MutableMemoryView recv_mem_view(recv_buf);
155 Int64 index = 0;
156 for( Int32 i=0; i<m_nb_rank; ++i ){
157 ConstMemoryView view(m_all_dispatchs_base[i]->m_const_view);
158 Int64 size = view.nbElement();
159 MemoryUtils::copyHost(recv_mem_view.subView(index,size), view);
160 index += size;
161 }
162 _collectiveBarrier();
163}
164
165/*---------------------------------------------------------------------------*/
166/*---------------------------------------------------------------------------*/
167
168void SharedMemoryParallelDispatchBase::
169_genericAllGatherVariable(ConstMemoryView send_buf,IResizableArray* recv_buf)
170{
171 m_const_view = send_buf;
172 _collectiveBarrier();
173 Int64 total_size = 0;
174 for( Integer i=0; i<m_nb_rank; ++i ){
175 total_size += m_all_dispatchs_base[i]->m_const_view.nbElement();
176 }
177 recv_buf->resize(total_size);
178 MutableMemoryView recv_mem_view(recv_buf->memoryView());
179 Int64 index = 0;
180 for( Integer i=0; i<m_nb_rank; ++i ){
181 ConstMemoryView view(m_all_dispatchs_base[i]->m_const_view);
182 Int64 size = view.nbElement();
183 MemoryUtils::copyHost(recv_mem_view.subView(index,size), view);
184 index += size;
185 }
186 _collectiveBarrier();
187}
188
189/*---------------------------------------------------------------------------*/
190/*---------------------------------------------------------------------------*/
191
192void SharedMemoryParallelDispatchBase::
193_genericScatterVariable(ConstMemoryView send_buf,MutableMemoryView recv_buf,Int32 root)
194{
195 m_const_view = send_buf;
196 m_recv_view = recv_buf;
197 _collectiveBarrier();
198 if (m_rank==root){
199 ConstMemoryView const_view(m_const_view);
200 Int64 index = 0;
201 for( Integer i=0; i<m_nb_rank; ++i ){
202 MutableMemoryView view(m_all_dispatchs_base[i]->m_recv_view);
203 Int64 size = view.nbElement();
204 MemoryUtils::copyHost(view, const_view.subView(index,size));
205 index += size;
206 }
207 }
208 _collectiveBarrier();
209}
210
211/*---------------------------------------------------------------------------*/
212/*---------------------------------------------------------------------------*/
213
214Request SharedMemoryParallelDispatchBase::
215_genericSend(ConstMemoryView send_buffer,const PointToPointMessageInfo& message2)
216{
217 PointToPointMessageInfo message(message2);
218 message.setEmiterRank(MessageRank(m_rank));
219 bool is_blocking = message.isBlocking();
220 if (message.isRankTag()){
221 Request r = m_message_queue->addSend(message,SendBufferInfo(send_buffer));
222 if (is_blocking){
223 m_message_queue->waitAll(ArrayView<Request>(1,&r));
224 return Request();
225 }
226 return r;
227 }
228 if (message.isMessageId()){
229 // Le send avec un MessageId n'existe pas.
230 ARCCORE_THROW(NotSupportedException,"Invalid generic send with MessageId");
231 }
232 ARCCORE_THROW(NotSupportedException,"Invalid message_info");
233}
234
235/*---------------------------------------------------------------------------*/
236/*---------------------------------------------------------------------------*/
237
238Request SharedMemoryParallelDispatchBase::
239_genericReceive(MutableMemoryView recv_buffer,const PointToPointMessageInfo& message2)
240{
241 PointToPointMessageInfo message(message2);
242 bool is_blocking = message.isBlocking();
243 message.setEmiterRank(MessageRank(m_rank));
244 ReceiveBufferInfo buf{recv_buffer};
245 Request r = m_message_queue->addReceive(message,buf);
246 if (is_blocking){
247 m_message_queue->waitAll(ArrayView<Request>(1,&r));
248 return MP::Request();
249 }
250 return r;
251}
252
253/*---------------------------------------------------------------------------*/
254/*---------------------------------------------------------------------------*/
255
256void SharedMemoryParallelDispatchBase::
257_genericBroadcast(MutableMemoryView send_buf,Int32 rank)
258{
259 m_broadcast_view = send_buf;
260 _collectiveBarrier();
261 MemoryUtils::copyHost(m_broadcast_view, m_all_dispatchs_base[rank]->m_broadcast_view);
262 _collectiveBarrier();
263}
264
265/*---------------------------------------------------------------------------*/
266/*---------------------------------------------------------------------------*/
267
268/*---------------------------------------------------------------------------*/
269/*---------------------------------------------------------------------------*/
270
271template<class Type> SharedMemoryParallelDispatch<Type>::
272SharedMemoryParallelDispatch(ITraceMng* tm,SharedMemoryParallelMng* parallel_mng,
273 ISharedMemoryMessageQueue* message_queue,
274 impl::ShareMemoryDispatcherContainer<Type>& containers)
275: BaseClass(tm,parallel_mng,message_queue,containers.all_dispatchs_base)
276, m_all_dispatchs(containers.all_dispatchs)
277{
278 m_reduce_infos.m_index = 0;
279 m_all_dispatchs[m_rank] = this;
280 m_all_dispatchs_base[m_rank] = this;
281}
282
283/*---------------------------------------------------------------------------*/
284/*---------------------------------------------------------------------------*/
285
286template<class Type> SharedMemoryParallelDispatch<Type>::
287~SharedMemoryParallelDispatch()
288{
289 finalize();
290}
291
292/*---------------------------------------------------------------------------*/
293/*---------------------------------------------------------------------------*/
294
295template<class Type> void SharedMemoryParallelDispatch<Type>::
296finalize()
297{
298}
299
300/*---------------------------------------------------------------------------*/
301/*---------------------------------------------------------------------------*/
302
303template<typename T>
304class _ThreadIntegralType
305{
306 public:
307 typedef FalseType IsIntegral;
308};
309
310#define ARCANE_DEFINE_INTEGRAL_TYPE(datatype)\
311template<>\
312class _ThreadIntegralType<datatype>\
313{\
314 public:\
315 typedef TrueType IsIntegral;\
316}
317
318ARCANE_DEFINE_INTEGRAL_TYPE(long long);
319ARCANE_DEFINE_INTEGRAL_TYPE(long);
320ARCANE_DEFINE_INTEGRAL_TYPE(int);
321ARCANE_DEFINE_INTEGRAL_TYPE(short);
322ARCANE_DEFINE_INTEGRAL_TYPE(unsigned long long);
323ARCANE_DEFINE_INTEGRAL_TYPE(unsigned long);
324ARCANE_DEFINE_INTEGRAL_TYPE(unsigned int);
325ARCANE_DEFINE_INTEGRAL_TYPE(unsigned short);
326ARCANE_DEFINE_INTEGRAL_TYPE(double);
327ARCANE_DEFINE_INTEGRAL_TYPE(float);
328ARCANE_DEFINE_INTEGRAL_TYPE(HPReal);
329
330/*---------------------------------------------------------------------------*/
331/*---------------------------------------------------------------------------*/
332
333namespace
334{
335
336template<class Type> void
337_computeMinMaxSum2(ArrayView<SharedMemoryParallelDispatch<Type>*> all_dispatchs,
338 Type& min_val,Type& max_val,Type& sum_val,
339 Int32& min_rank,Int32& max_rank,Int32 nb_rank,FalseType)
340{
341 ARCANE_UNUSED(all_dispatchs);
342 ARCANE_UNUSED(min_val);
343 ARCANE_UNUSED(max_val);
344 ARCANE_UNUSED(sum_val);
345 ARCANE_UNUSED(min_rank);
346 ARCANE_UNUSED(max_rank);
347 ARCANE_UNUSED(nb_rank);
348
349 throw NotImplementedException(A_FUNCINFO);
350}
351
352/*---------------------------------------------------------------------------*/
353/*---------------------------------------------------------------------------*/
354
355template<class Type> void
356_computeMinMaxSum2(ArrayView<SharedMemoryParallelDispatch<Type>*> all_dispatchs,
357 Type& min_val,Type& max_val,Type& sum_val,
358 Int32& min_rank,Int32& max_rank,Int32 nb_rank,TrueType)
359{
360 Type _min_val = all_dispatchs[0]->m_reduce_infos.reduce_value;
361 Type _max_val = _min_val;
362 Type _sum_val = _min_val;
363 Integer _min_rank = 0;
364 Integer _max_rank = 0;
365 for( Integer i=1; i<nb_rank; ++i ){
366 Type cval = all_dispatchs[i]->m_reduce_infos.reduce_value;
367 if (cval<_min_val){
368 _min_val = cval;
369 _min_rank = i;
370 }
371 if (cval>_max_val){
372 _max_val = cval;
373 _max_rank = i;
374 }
375 _sum_val = (Type)(_sum_val + cval);
376 }
377 min_val = _min_val;
378 max_val = _max_val;
379 sum_val = _sum_val;
380 min_rank = _min_rank;
381 max_rank = _max_rank;
382}
383
384}
385
386/*---------------------------------------------------------------------------*/
387/*---------------------------------------------------------------------------*/
388
389template<class Type> void SharedMemoryParallelDispatch<Type>::
390computeMinMaxSum(Type val,Type& min_val,Type& max_val,Type& sum_val,
391 Int32& min_rank,Int32& max_rank)
392{
393 typedef typename _ThreadIntegralType<Type>::IsIntegral IntegralType;
394 m_reduce_infos.reduce_value = val;
395 _collectiveBarrier();
396 _computeMinMaxSum2(m_all_dispatchs,min_val,max_val,sum_val,min_rank,max_rank,m_nb_rank,IntegralType());
397 _collectiveBarrier();
398}
399
400/*---------------------------------------------------------------------------*/
401/*---------------------------------------------------------------------------*/
402
403template<class Type> void SharedMemoryParallelDispatch<Type>::
404computeMinMaxSum(ConstArrayView<Type> values,
405 ArrayView<Type> min_values,
406 ArrayView<Type> max_values,
407 ArrayView<Type> sum_values,
408 ArrayView<Int32> min_ranks,
409 ArrayView<Int32> max_ranks)
410{
411 // Implémentation sous-optimale qui ne vectorise pas le calcul
412 // (c'est actuellement un copier-coller d'au-dessus mis dans une boucle)
413 typedef typename _ThreadIntegralType<Type>::IsIntegral IntegralType;
414 Integer n = values.size();
415 for(Integer i=0;i<n;++i) {
416 m_reduce_infos.reduce_value = values[i];
417 _collectiveBarrier();
418 _computeMinMaxSum2(m_all_dispatchs,min_values[i],max_values[i],sum_values[i],
419 min_ranks[i],max_ranks[i],m_nb_rank,IntegralType());
420 _collectiveBarrier();
421 }
422}
423
424/*---------------------------------------------------------------------------*/
425/*---------------------------------------------------------------------------*/
426
427template<class Type> void SharedMemoryParallelDispatch<Type>::
428broadcast(Span<Type> send_buf,Int32 rank)
429{
430 _genericBroadcast(MutableMemoryView(send_buf),rank);
431}
432
433/*---------------------------------------------------------------------------*/
434/*---------------------------------------------------------------------------*/
435
436template<class Type> void SharedMemoryParallelDispatch<Type>::
437allGather(Span<const Type> send_buf,Span<Type> recv_buf)
438{
439 _genericAllGather(ConstMemoryView{send_buf},MutableMemoryView{recv_buf});
440}
441
442/*---------------------------------------------------------------------------*/
443/*---------------------------------------------------------------------------*/
444
445template<class Type> void SharedMemoryParallelDispatch<Type>::
446gather(Span<const Type> send_buf,Span<Type> recv_buf,Int32 root_rank)
447{
448 UniqueArray<Type> tmp_buf;
449 if (m_rank==root_rank)
450 allGather(send_buf,recv_buf);
451 else{
452 tmp_buf.resize(send_buf.size() * m_nb_rank);
453 allGather(send_buf,tmp_buf);
454 }
455}
456
457/*---------------------------------------------------------------------------*/
458/*---------------------------------------------------------------------------*/
459
460template<class Type> void SharedMemoryParallelDispatch<Type>::
461allGatherVariable(Span<const Type> send_buf,Array<Type>& recv_buf)
462{
463 ResizableArrayRef recv_buf_ref(recv_buf);
464 _genericAllGatherVariable(ConstMemoryView(send_buf),&recv_buf_ref);
465}
466
467/*---------------------------------------------------------------------------*/
468/*---------------------------------------------------------------------------*/
469
470template<class Type> void SharedMemoryParallelDispatch<Type>::
471gatherVariable(Span<const Type> send_buf,Array<Type>& recv_buf,Int32 root_rank)
472{
473 UniqueArray<Type> tmp_buf;
474 if (m_rank==root_rank)
475 allGatherVariable(send_buf,recv_buf);
476 else
477 allGatherVariable(send_buf,tmp_buf);
478}
479
480/*---------------------------------------------------------------------------*/
481/*---------------------------------------------------------------------------*/
482
483template<class Type> void SharedMemoryParallelDispatch<Type>::
484scatterVariable(Span<const Type> send_buf,Span<Type> recv_buf,Int32 root)
485{
486 _genericScatterVariable(ConstMemoryView(send_buf),MutableMemoryView(recv_buf),root);
487}
488
489/*---------------------------------------------------------------------------*/
490/*---------------------------------------------------------------------------*/
491
492template<class Type> void SharedMemoryParallelDispatch<Type>::
493allToAll(Span<const Type> send_buf,Span<Type> recv_buf,Int32 count)
494{
495 _genericAllToAll(ConstMemoryView(send_buf),MutableMemoryView(recv_buf),count);
496}
497
498/*---------------------------------------------------------------------------*/
499/*---------------------------------------------------------------------------*/
500
501template<class Type> void SharedMemoryParallelDispatch<Type>::
502allToAllVariable(Span<const Type> send_buf, ConstArrayView<Int32> send_count,
503 ConstArrayView<Int32> send_index,
504 Span<Type> recv_buf, ConstArrayView<Int32> recv_count,
505 Int32ConstArrayView recv_index
506 )
507{
508 _genericAllToAllVariable(ConstMemoryView(send_buf), send_count, send_index,
509 MutableMemoryView(recv_buf), recv_count, recv_index);
510}
511
512/*---------------------------------------------------------------------------*/
513/*---------------------------------------------------------------------------*/
514
515template<class Type> auto SharedMemoryParallelDispatch<Type>::
516send(Span<const Type> send_buffer,Int32 rank,bool is_blocking) -> Request
517{
518 auto block_mode = (is_blocking) ? Parallel::Blocking : Parallel::NonBlocking;
519 auto p2p_message = m_parallel_mng->buildMessage(rank,block_mode);
520 return send(send_buffer,p2p_message);
521}
522
523/*---------------------------------------------------------------------------*/
524/*---------------------------------------------------------------------------*/
525
526template<class Type> void SharedMemoryParallelDispatch<Type>::
527send(ConstArrayView<Type> send_buf,Int32 rank)
528{
529 send(send_buf,rank,true);
530}
531
532/*---------------------------------------------------------------------------*/
533/*---------------------------------------------------------------------------*/
534
535template<class Type> Parallel::Request SharedMemoryParallelDispatch<Type>::
536receive(Span<Type> recv_buffer,Int32 rank,bool is_blocking)
537{
538 auto block_mode = (is_blocking) ? Parallel::Blocking : Parallel::NonBlocking;
539 auto p2p_message = m_parallel_mng->buildMessage(rank,block_mode);
540 return receive(recv_buffer,p2p_message);
541}
542
543/*---------------------------------------------------------------------------*/
544/*---------------------------------------------------------------------------*/
545
546template<class Type> Request SharedMemoryParallelDispatch<Type>::
547send(Span<const Type> send_buffer,const PointToPointMessageInfo& message2)
548{
549 return _genericSend(ConstMemoryView(send_buffer),message2);
550}
551
552/*---------------------------------------------------------------------------*/
553/*---------------------------------------------------------------------------*/
554
555template<class Type> Request SharedMemoryParallelDispatch<Type>::
556receive(Span<Type> recv_buffer,const PointToPointMessageInfo& message2)
557{
558 return _genericReceive(MutableMemoryView(recv_buffer),message2);
559}
560
561/*---------------------------------------------------------------------------*/
562/*---------------------------------------------------------------------------*/
563
564template<class Type> void SharedMemoryParallelDispatch<Type>::
565recv(ArrayView<Type> recv_buffer,Integer rank)
566{
567 recv(recv_buffer,rank,true);
568}
569
570/*---------------------------------------------------------------------------*/
571/*---------------------------------------------------------------------------*/
572
573template<class Type> void SharedMemoryParallelDispatch<Type>::
574sendRecv(ConstArrayView<Type> send_buffer,ArrayView<Type> recv_buffer,Integer proc)
575{
576 ARCANE_UNUSED(send_buffer);
577 ARCANE_UNUSED(recv_buffer);
578 ARCANE_UNUSED(proc);
579 throw NotImplementedException(A_FUNCINFO);
580}
581
582/*---------------------------------------------------------------------------*/
583/*---------------------------------------------------------------------------*/
584
585template<class Type> Type SharedMemoryParallelDispatch<Type>::
586allReduce(eReduceType op,Type send_buf)
587{
588 m_reduce_infos.reduce_value = send_buf;
589 //cout << "ALL REDUCE BEGIN RANk=" << m_rank << " TYPE=" << (int)op << " MY=" << send_buf << '\n';
590 cout.flush();
591 _collectiveBarrier();
592 Type ret = m_all_dispatchs[0]->m_reduce_infos.reduce_value;
593 switch(op){
594 case Parallel::ReduceMin:
595 for( Integer i=1; i<m_nb_rank; ++i )
596 ret = math::min(ret,m_all_dispatchs[i]->m_reduce_infos.reduce_value);
597 break;
598 case Parallel::ReduceMax:
599 for( Integer i=1; i<m_nb_rank; ++i )
600 ret = math::max(ret,m_all_dispatchs[i]->m_reduce_infos.reduce_value);
601 break;
602 case Parallel::ReduceSum:
603 for( Integer i=1; i<m_nb_rank; ++i )
604 ret = (Type)(ret + m_all_dispatchs[i]->m_reduce_infos.reduce_value);
605 break;
606 default:
607 ARCANE_FATAL("Bad reduce type {0}",(int)op);
608 }
609 //cout << "ALL REDUCE RANK=" << m_rank << " TYPE=" << (int)op << " MY=" << send_buf << " GLOBAL=" << ret << '\n';
610 _collectiveBarrier();
611 return ret;
612}
613
614/*---------------------------------------------------------------------------*/
615/*---------------------------------------------------------------------------*/
616
617template<class Type> void SharedMemoryParallelDispatch<Type>::
618_allReduceOrScan(eReduceType op, Span<Type> send_buf, bool is_scan)
619{
620 m_reduce_infos.reduce_buf = send_buf;
621 ++m_reduce_infos.m_index;
622 Int64 buf_size = send_buf.size();
623 UniqueArray<Type> ret(buf_size);
624 //cout << "ALL REDUCE BEGIN RANk=" << m_rank << " TYPE=" << (int)op << " MY=" << send_buf << '\n';
625 //cout.flush();
626 _collectiveBarrier();
627 {
628 Integer index0 = m_all_dispatchs[0]->m_reduce_infos.m_index;
629 for( Integer i=0; i<m_nb_rank; ++i ){
630 Integer indexi = m_all_dispatchs[i]->m_reduce_infos.m_index;
631 if (index0!=m_all_dispatchs[i]->m_reduce_infos.m_index){
632 ARCANE_FATAL("INTERNAL: incoherent all reduce i0={0} in={1} n={2}",
633 index0,indexi,i);
634 }
635 }
636 }
637 Int32 nb_rank = m_nb_rank;
638 if (is_scan)
639 nb_rank = m_rank + 1;
640 for( Integer j=0; j<buf_size; ++j )
641 ret[j] = m_all_dispatchs[0]->m_reduce_infos.reduce_buf[j];
642 switch(op){
643 case Parallel::ReduceMin:
644 for (Integer i = 1; i < nb_rank; ++i)
645 for( Integer j=0; j<buf_size; ++j )
646 ret[j] = math::min(ret[j],m_all_dispatchs[i]->m_reduce_infos.reduce_buf[j]);
647 break;
648 case Parallel::ReduceMax:
649 for (Integer i = 1; i < nb_rank; ++i)
650 for( Integer j=0; j<buf_size; ++j )
651 ret[j] = math::max(ret[j],m_all_dispatchs[i]->m_reduce_infos.reduce_buf[j]);
652 break;
653 case Parallel::ReduceSum:
654 for (Integer i = 1; i < nb_rank; ++i)
655 for( Integer j=0; j<buf_size; ++j )
656 ret[j] = (Type)(ret[j] + m_all_dispatchs[i]->m_reduce_infos.reduce_buf[j]);
657 break;
658 default:
659 ARCANE_FATAL("Bad reduce type");
660 }
661 //cout << "ALL REDUCE RANK=" << m_rank << " TYPE=" << (int)op << " MY=" << send_buf << " GLOBAL=" << ret << '\n';
662 _collectiveBarrier();
663 for( Integer j=0; j<buf_size; ++j )
664 send_buf[j] = ret[j];
665}
666
667/*---------------------------------------------------------------------------*/
668/*---------------------------------------------------------------------------*/
669
670template <class Type> void SharedMemoryParallelDispatch<Type>::
671allReduce(eReduceType op, Span<Type> send_buf)
672{
673 _allReduceOrScan(op, send_buf, false);
674}
675
676/*---------------------------------------------------------------------------*/
677/*---------------------------------------------------------------------------*/
678
679template<class Type> Request SharedMemoryParallelDispatch<Type>::
680nonBlockingAllReduce(eReduceType op,Span<const Type> send_buf,Span<Type> recv_buf)
681{
682 ARCANE_UNUSED(op);
683 ARCANE_UNUSED(send_buf);
684 ARCANE_UNUSED(recv_buf);
685 throw NotImplementedException(A_FUNCINFO);
686}
687
688/*---------------------------------------------------------------------------*/
689/*---------------------------------------------------------------------------*/
690
691template<class Type> Request SharedMemoryParallelDispatch<Type>::
692nonBlockingAllGather(Span<const Type> send_buf, Span<Type> recv_buf)
693{
694 ARCANE_UNUSED(send_buf);
695 ARCANE_UNUSED(recv_buf);
696 throw NotImplementedException(A_FUNCINFO);
697}
698
699/*---------------------------------------------------------------------------*/
700/*---------------------------------------------------------------------------*/
701
702template<class Type> Request SharedMemoryParallelDispatch<Type>::
703nonBlockingBroadcast(Span<Type> send_buf, Int32 rank)
704{
705 ARCANE_UNUSED(send_buf);
706 ARCANE_UNUSED(rank);
707 throw NotImplementedException(A_FUNCINFO);
708}
709
710/*---------------------------------------------------------------------------*/
711/*---------------------------------------------------------------------------*/
712
713template<class Type> Request SharedMemoryParallelDispatch<Type>::
714nonBlockingGather(Span<const Type> send_buf, Span<Type> recv_buf, Int32 rank)
715{
716 ARCANE_UNUSED(send_buf);
717 ARCANE_UNUSED(recv_buf);
718 ARCANE_UNUSED(rank);
719 throw NotImplementedException(A_FUNCINFO);
720}
721
722/*---------------------------------------------------------------------------*/
723/*---------------------------------------------------------------------------*/
724
725template<class Type> Request SharedMemoryParallelDispatch<Type>::
726nonBlockingAllToAll(Span<const Type> send_buf, Span<Type> recv_buf, Int32 count)
727{
728 ARCANE_UNUSED(send_buf);
729 ARCANE_UNUSED(recv_buf);
730 ARCANE_UNUSED(count);
731 throw NotImplementedException(A_FUNCINFO);
732}
733
734/*---------------------------------------------------------------------------*/
735/*---------------------------------------------------------------------------*/
736
737template<class Type> Request SharedMemoryParallelDispatch<Type>::
738nonBlockingAllToAllVariable(Span<const Type> send_buf, ConstArrayView<Int32> send_count,
739 ConstArrayView<Int32> send_index, Span<Type> recv_buf,
740 ConstArrayView<Int32> recv_count, ConstArrayView<Int32> recv_index)
741{
742 ARCANE_UNUSED(send_buf);
743 ARCANE_UNUSED(recv_buf);
744 ARCANE_UNUSED(send_count);
745 ARCANE_UNUSED(recv_count);
746 ARCANE_UNUSED(send_index);
747 ARCANE_UNUSED(recv_index);
748 throw NotImplementedException(A_FUNCINFO);
749}
750
751/*---------------------------------------------------------------------------*/
752/*---------------------------------------------------------------------------*/
753
754template<class Type> Type SharedMemoryParallelDispatch<Type>::
755scan(eReduceType op,Type send_buf)
756{
757 ARCANE_UNUSED(op);
758 ARCANE_UNUSED(send_buf);
759 throw NotImplementedException(A_FUNCINFO);
760}
761
762/*---------------------------------------------------------------------------*/
763/*---------------------------------------------------------------------------*/
764
765template<class Type> void SharedMemoryParallelDispatch<Type>::
766scan(eReduceType op,ArrayView<Type> send_buf)
767{
768 _allReduceOrScan(op, send_buf, true);
769}
770
771/*---------------------------------------------------------------------------*/
772/*---------------------------------------------------------------------------*/
773
774template<class Type> void SharedMemoryParallelDispatch<Type>::
775waitAll()
776{
777 // TEMPORAIRE: a priori pas utilisé
778 throw NotImplementedException(A_FUNCINFO);
779}
780
781/*---------------------------------------------------------------------------*/
782/*---------------------------------------------------------------------------*/
783
784template<class Type> Request SharedMemoryParallelDispatch<Type>::
786{
787 throw NotImplementedException(A_FUNCINFO);
788}
789
790/*---------------------------------------------------------------------------*/
791/*---------------------------------------------------------------------------*/
792
793template class SharedMemoryParallelDispatch<char>;
794template class SharedMemoryParallelDispatch<signed char>;
795template class SharedMemoryParallelDispatch<unsigned char>;
796template class SharedMemoryParallelDispatch<short>;
797template class SharedMemoryParallelDispatch<unsigned short>;
798template class SharedMemoryParallelDispatch<int>;
799template class SharedMemoryParallelDispatch<unsigned int>;
800template class SharedMemoryParallelDispatch<long>;
801template class SharedMemoryParallelDispatch<unsigned long>;
802template class SharedMemoryParallelDispatch<long long>;
803template class SharedMemoryParallelDispatch<unsigned long long>;
804template class SharedMemoryParallelDispatch<float>;
805template class SharedMemoryParallelDispatch<double>;
806template class SharedMemoryParallelDispatch<long double>;
807template class SharedMemoryParallelDispatch<APReal>;
808template class SharedMemoryParallelDispatch<Real2>;
809template class SharedMemoryParallelDispatch<Real3>;
810template class SharedMemoryParallelDispatch<Real2x2>;
811template class SharedMemoryParallelDispatch<Real3x3>;
812template class SharedMemoryParallelDispatch<HPReal>;
813
814/*---------------------------------------------------------------------------*/
815/*---------------------------------------------------------------------------*/
816
817} // End namespace Arcane::MessagePassing
818
819/*---------------------------------------------------------------------------*/
820/*---------------------------------------------------------------------------*/
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Fonctions de gestion mémoire et des allocateurs.
Informations pour un message 'gather' pour le type de données DataType.
Interface d'une file de messages avec les threads.
Gestionnaire du parallélisme utilisant les threads.
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
Int32 Integer
Type représentant un entier.
UniqueArray< Int32 > Int32UniqueArray
Tableau dynamique à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:428
Type
Type of JSON value.
Definition rapidjson.h:665