Arcane  v3.16.0.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
SharedMemoryParallelDispatch.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2024 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* SharedMemoryParallelDispatch.cc (C) 2000-2024 */
9/* */
10/* Implémentation des messages en mémoire partagée. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/utils/ArcanePrecomp.h"
15
16#include "arcane/utils/Array.h"
17#include "arcane/utils/PlatformUtils.h"
18#include "arcane/utils/String.h"
19#include "arcane/utils/ITraceMng.h"
20#include "arcane/utils/Real2.h"
21#include "arcane/utils/Real3.h"
22#include "arcane/utils/Real2x2.h"
23#include "arcane/utils/Real3x3.h"
24#include "arcane/utils/APReal.h"
25#include "arcane/utils/NotImplementedException.h"
26#include "arcane/utils/MemoryView.h"
27
28#include "arcane/MeshVariable.h"
29#include "arcane/IParallelMng.h"
30#include "arcane/ItemGroup.h"
31#include "arcane/IMesh.h"
32#include "arcane/IBase.h"
33
34#include "arcane/parallel/thread/SharedMemoryParallelDispatch.h"
35#include "arcane/parallel/thread/SharedMemoryParallelMng.h"
36#include "arcane/parallel/thread/ISharedMemoryMessageQueue.h"
37
38#include "arccore/message_passing/PointToPointMessageInfo.h"
39
40/*---------------------------------------------------------------------------*/
41/*---------------------------------------------------------------------------*/
42
44{
45
46/*---------------------------------------------------------------------------*/
47/*---------------------------------------------------------------------------*/
48
49/*
50 * TODO: pour simplifier le debug lorsqu'il y a un décalage des appels
51 * collectifs entre les threads, il faudrait faire un type de barrière
52 * par type d'appel collectif alors qu'actuellement tous les appels
53 * collectifs utilisent la même barrière (via _collectiveBarrier()).
54 * A cause de cela, des problèmes peuvent survenir qui ne sont pas
55 * facilement détectable. Par exemple:
56 *
57 * Thread1:
58 * allGather();
59 * barrier();
60 * allReduce();
61 * Thread2:
62 * barrier();
63 * allGather();
64 * allReduce();
65 *
66 * Dans ce cas, le code ne plantera pas mais les valeurs des collectives ne
67 * seront pas bonnes.
68 */
69/*---------------------------------------------------------------------------*/
70/*---------------------------------------------------------------------------*/
71
72SharedMemoryParallelDispatchBase::
73SharedMemoryParallelDispatchBase(ITraceMng* tm,SharedMemoryParallelMng* parallel_mng,
74 ISharedMemoryMessageQueue* message_queue,
75 ArrayView<SharedMemoryParallelDispatchBase*> all_dispatchs_base)
76: TraceAccessor(tm)
77, m_parallel_mng(parallel_mng)
78, m_rank(parallel_mng->commRank())
79, m_nb_rank(parallel_mng->commSize())
80, m_message_queue(message_queue)
81, m_all_dispatchs_base(all_dispatchs_base)
82{
83}
84
85/*---------------------------------------------------------------------------*/
86/*---------------------------------------------------------------------------*/
87
88void SharedMemoryParallelDispatchBase::
89_collectiveBarrier()
90{
91 m_parallel_mng->getThreadBarrier()->wait();
92}
93
94/*---------------------------------------------------------------------------*/
95/*---------------------------------------------------------------------------*/
96
97void SharedMemoryParallelDispatchBase::
98_genericAllToAll(ConstMemoryView send_buf,MutableMemoryView recv_buf,Int32 count)
99{
100 Int32 nb_rank = m_nb_rank;
101
102 //TODO: Faire une version sans allocation
103 Int32UniqueArray send_count(nb_rank,count);
104 Int32UniqueArray recv_count(nb_rank,count);
105
106 Int32UniqueArray send_indexes(nb_rank);
107 Int32UniqueArray recv_indexes(nb_rank);
108 for( Integer i=0; i<nb_rank; ++i ){
109 send_indexes[i] = count * i;
110 recv_indexes[i] = count * i;
111 }
112 _genericAllToAllVariable(send_buf,send_count,send_indexes,recv_buf,recv_count,recv_indexes);
113}
114
115/*---------------------------------------------------------------------------*/
116/*---------------------------------------------------------------------------*/
117
118void SharedMemoryParallelDispatchBase::
119_genericAllToAllVariable(ConstMemoryView send_buf,
120 Span<const Int32> send_count,
121 Span<const Int32> send_index,
122 MutableMemoryView recv_buf,
123 Span<const Int32> recv_count,
124 Span<const Int32> recv_index
125 )
126{
127 m_alltoallv_infos.send_buf = send_buf;
128 m_alltoallv_infos.send_count = send_count;
129 m_alltoallv_infos.send_index = send_index;
130 m_alltoallv_infos.recv_buf = recv_buf;
131 m_alltoallv_infos.recv_count = recv_count;
132 m_alltoallv_infos.recv_index = recv_index;
133 _collectiveBarrier();
134 Integer global_index = 0;
135 Int32 my_rank = m_rank;
136 MutableMemoryView recv_mem_buf(recv_buf);
137 for( Integer i=0; i<m_nb_rank; ++i ){
138 AllToAllVariableInfo ainfo = m_all_dispatchs_base[i]->m_alltoallv_infos;
139 ConstMemoryView view(ainfo.send_buf);
140 Integer index = ainfo.send_index[my_rank];
141 Integer count = ainfo.send_count[my_rank];
142 recv_mem_buf.subView(global_index,count).copyHost(view.subView(index,count));
143 global_index += count;
144 }
145 _collectiveBarrier();
146}
147
148/*---------------------------------------------------------------------------*/
149/*---------------------------------------------------------------------------*/
150
151void SharedMemoryParallelDispatchBase::
152_genericAllGather(ConstMemoryView send_buf,MutableMemoryView recv_buf)
153{
154 m_const_view = send_buf;
155 _collectiveBarrier();
156 MutableMemoryView recv_mem_view(recv_buf);
157 Int64 index = 0;
158 for( Int32 i=0; i<m_nb_rank; ++i ){
159 ConstMemoryView view(m_all_dispatchs_base[i]->m_const_view);
160 Int64 size = view.nbElement();
161 recv_mem_view.subView(index,size).copyHost(view);
162 index += size;
163 }
164 _collectiveBarrier();
165}
166
167/*---------------------------------------------------------------------------*/
168/*---------------------------------------------------------------------------*/
169
170void SharedMemoryParallelDispatchBase::
171_genericAllGatherVariable(ConstMemoryView send_buf,IResizableArray* recv_buf)
172{
173 m_const_view = send_buf;
174 _collectiveBarrier();
175 Int64 total_size = 0;
176 for( Integer i=0; i<m_nb_rank; ++i ){
177 total_size += m_all_dispatchs_base[i]->m_const_view.nbElement();
178 }
179 recv_buf->resize(total_size);
180 MutableMemoryView recv_mem_view(recv_buf->memoryView());
181 Int64 index = 0;
182 for( Integer i=0; i<m_nb_rank; ++i ){
183 ConstMemoryView view(m_all_dispatchs_base[i]->m_const_view);
184 Int64 size = view.nbElement();
185 recv_mem_view.subView(index,size).copyHost(view);
186 index += size;
187 }
188 _collectiveBarrier();
189}
190
191/*---------------------------------------------------------------------------*/
192/*---------------------------------------------------------------------------*/
193
194void SharedMemoryParallelDispatchBase::
195_genericScatterVariable(ConstMemoryView send_buf,MutableMemoryView recv_buf,Int32 root)
196{
197 m_const_view = send_buf;
198 m_recv_view = recv_buf;
199 _collectiveBarrier();
200 if (m_rank==root){
201 ConstMemoryView const_view(m_const_view);
202 Int64 index = 0;
203 for( Integer i=0; i<m_nb_rank; ++i ){
204 MutableMemoryView view(m_all_dispatchs_base[i]->m_recv_view);
205 Int64 size = view.nbElement();
206 view.copyHost(const_view.subView(index,size));
207 index += size;
208 }
209 }
210 _collectiveBarrier();
211}
212
213/*---------------------------------------------------------------------------*/
214/*---------------------------------------------------------------------------*/
215
216Request SharedMemoryParallelDispatchBase::
217_genericSend(ConstMemoryView send_buffer,const PointToPointMessageInfo& message2)
218{
219 PointToPointMessageInfo message(message2);
220 message.setEmiterRank(MessageRank(m_rank));
221 bool is_blocking = message.isBlocking();
222 if (message.isRankTag()){
223 Request r = m_message_queue->addSend(message,SendBufferInfo(send_buffer));
224 if (is_blocking){
225 m_message_queue->waitAll(ArrayView<Request>(1,&r));
226 return Request();
227 }
228 return r;
229 }
230 if (message.isMessageId()){
231 // Le send avec un MessageId n'existe pas.
232 ARCCORE_THROW(NotSupportedException,"Invalid generic send with MessageId");
233 }
234 ARCCORE_THROW(NotSupportedException,"Invalid message_info");
235}
236
237/*---------------------------------------------------------------------------*/
238/*---------------------------------------------------------------------------*/
239
240Request SharedMemoryParallelDispatchBase::
241_genericReceive(MutableMemoryView recv_buffer,const PointToPointMessageInfo& message2)
242{
243 PointToPointMessageInfo message(message2);
244 bool is_blocking = message.isBlocking();
245 message.setEmiterRank(MessageRank(m_rank));
246 ReceiveBufferInfo buf{recv_buffer};
247 Request r = m_message_queue->addReceive(message,buf);
248 if (is_blocking){
249 m_message_queue->waitAll(ArrayView<Request>(1,&r));
250 return MP::Request();
251 }
252 return r;
253}
254
255/*---------------------------------------------------------------------------*/
256/*---------------------------------------------------------------------------*/
257
258void SharedMemoryParallelDispatchBase::
259_genericBroadcast(MutableMemoryView send_buf,Int32 rank)
260{
261 m_broadcast_view = send_buf;
262 _collectiveBarrier();
263 m_broadcast_view.copyHost(m_all_dispatchs_base[rank]->m_broadcast_view);
264 _collectiveBarrier();
265}
266
267/*---------------------------------------------------------------------------*/
268/*---------------------------------------------------------------------------*/
269
270/*---------------------------------------------------------------------------*/
271/*---------------------------------------------------------------------------*/
272
273template<class Type> SharedMemoryParallelDispatch<Type>::
274SharedMemoryParallelDispatch(ITraceMng* tm,SharedMemoryParallelMng* parallel_mng,
275 ISharedMemoryMessageQueue* message_queue,
276 impl::ShareMemoryDispatcherContainer<Type>& containers)
277: BaseClass(tm,parallel_mng,message_queue,containers.all_dispatchs_base)
278, m_all_dispatchs(containers.all_dispatchs)
279{
280 m_reduce_infos.m_index = 0;
281 m_all_dispatchs[m_rank] = this;
282 m_all_dispatchs_base[m_rank] = this;
283}
284
285/*---------------------------------------------------------------------------*/
286/*---------------------------------------------------------------------------*/
287
288template<class Type> SharedMemoryParallelDispatch<Type>::
289~SharedMemoryParallelDispatch()
290{
291 finalize();
292}
293
294/*---------------------------------------------------------------------------*/
295/*---------------------------------------------------------------------------*/
296
297template<class Type> void SharedMemoryParallelDispatch<Type>::
298finalize()
299{
300}
301
302/*---------------------------------------------------------------------------*/
303/*---------------------------------------------------------------------------*/
304
305template<typename T>
306class _ThreadIntegralType
307{
308 public:
309 typedef FalseType IsIntegral;
310};
311
312#define ARCANE_DEFINE_INTEGRAL_TYPE(datatype)\
313template<>\
314class _ThreadIntegralType<datatype>\
315{\
316 public:\
317 typedef TrueType IsIntegral;\
318}
319
320ARCANE_DEFINE_INTEGRAL_TYPE(long long);
321ARCANE_DEFINE_INTEGRAL_TYPE(long);
322ARCANE_DEFINE_INTEGRAL_TYPE(int);
323ARCANE_DEFINE_INTEGRAL_TYPE(short);
324ARCANE_DEFINE_INTEGRAL_TYPE(unsigned long long);
325ARCANE_DEFINE_INTEGRAL_TYPE(unsigned long);
326ARCANE_DEFINE_INTEGRAL_TYPE(unsigned int);
327ARCANE_DEFINE_INTEGRAL_TYPE(unsigned short);
328ARCANE_DEFINE_INTEGRAL_TYPE(double);
329ARCANE_DEFINE_INTEGRAL_TYPE(float);
330ARCANE_DEFINE_INTEGRAL_TYPE(HPReal);
331
332/*---------------------------------------------------------------------------*/
333/*---------------------------------------------------------------------------*/
334
335namespace
336{
337
338template<class Type> void
339_computeMinMaxSum2(ArrayView<SharedMemoryParallelDispatch<Type>*> all_dispatchs,
340 Type& min_val,Type& max_val,Type& sum_val,
341 Int32& min_rank,Int32& max_rank,Int32 nb_rank,FalseType)
342{
343 ARCANE_UNUSED(all_dispatchs);
344 ARCANE_UNUSED(min_val);
345 ARCANE_UNUSED(max_val);
346 ARCANE_UNUSED(sum_val);
347 ARCANE_UNUSED(min_rank);
348 ARCANE_UNUSED(max_rank);
349 ARCANE_UNUSED(nb_rank);
350
351 throw NotImplementedException(A_FUNCINFO);
352}
353
354/*---------------------------------------------------------------------------*/
355/*---------------------------------------------------------------------------*/
356
357template<class Type> void
358_computeMinMaxSum2(ArrayView<SharedMemoryParallelDispatch<Type>*> all_dispatchs,
359 Type& min_val,Type& max_val,Type& sum_val,
360 Int32& min_rank,Int32& max_rank,Int32 nb_rank,TrueType)
361{
362 Type _min_val = all_dispatchs[0]->m_reduce_infos.reduce_value;
363 Type _max_val = _min_val;
364 Type _sum_val = _min_val;
365 Integer _min_rank = 0;
366 Integer _max_rank = 0;
367 for( Integer i=1; i<nb_rank; ++i ){
368 Type cval = all_dispatchs[i]->m_reduce_infos.reduce_value;
369 if (cval<_min_val){
370 _min_val = cval;
371 _min_rank = i;
372 }
373 if (cval>_max_val){
374 _max_val = cval;
375 _max_rank = i;
376 }
377 _sum_val = (Type)(_sum_val + cval);
378 }
379 min_val = _min_val;
380 max_val = _max_val;
381 sum_val = _sum_val;
382 min_rank = _min_rank;
383 max_rank = _max_rank;
384}
385
386}
387
388/*---------------------------------------------------------------------------*/
389/*---------------------------------------------------------------------------*/
390
391template<class Type> void SharedMemoryParallelDispatch<Type>::
392computeMinMaxSum(Type val,Type& min_val,Type& max_val,Type& sum_val,
393 Int32& min_rank,Int32& max_rank)
394{
395 typedef typename _ThreadIntegralType<Type>::IsIntegral IntegralType;
396 m_reduce_infos.reduce_value = val;
397 _collectiveBarrier();
398 _computeMinMaxSum2(m_all_dispatchs,min_val,max_val,sum_val,min_rank,max_rank,m_nb_rank,IntegralType());
399 _collectiveBarrier();
400}
401
402/*---------------------------------------------------------------------------*/
403/*---------------------------------------------------------------------------*/
404
405template<class Type> void SharedMemoryParallelDispatch<Type>::
406computeMinMaxSum(ConstArrayView<Type> values,
407 ArrayView<Type> min_values,
408 ArrayView<Type> max_values,
409 ArrayView<Type> sum_values,
410 ArrayView<Int32> min_ranks,
411 ArrayView<Int32> max_ranks)
412{
413 // Implémentation sous-optimale qui ne vectorise pas le calcul
414 // (c'est actuellement un copier-coller d'au-dessus mis dans une boucle)
415 typedef typename _ThreadIntegralType<Type>::IsIntegral IntegralType;
416 Integer n = values.size();
417 for(Integer i=0;i<n;++i) {
418 m_reduce_infos.reduce_value = values[i];
419 _collectiveBarrier();
420 _computeMinMaxSum2(m_all_dispatchs,min_values[i],max_values[i],sum_values[i],
421 min_ranks[i],max_ranks[i],m_nb_rank,IntegralType());
422 _collectiveBarrier();
423 }
424}
425
426/*---------------------------------------------------------------------------*/
427/*---------------------------------------------------------------------------*/
428
429template<class Type> void SharedMemoryParallelDispatch<Type>::
430broadcast(Span<Type> send_buf,Int32 rank)
431{
432 _genericBroadcast(MutableMemoryView(send_buf),rank);
433}
434
435/*---------------------------------------------------------------------------*/
436/*---------------------------------------------------------------------------*/
437
438template<class Type> void SharedMemoryParallelDispatch<Type>::
439allGather(Span<const Type> send_buf,Span<Type> recv_buf)
440{
441 _genericAllGather(ConstMemoryView{send_buf},MutableMemoryView{recv_buf});
442}
443
444/*---------------------------------------------------------------------------*/
445/*---------------------------------------------------------------------------*/
446
447template<class Type> void SharedMemoryParallelDispatch<Type>::
448gather(Span<const Type> send_buf,Span<Type> recv_buf,Int32 root_rank)
449{
450 UniqueArray<Type> tmp_buf;
451 if (m_rank==root_rank)
452 allGather(send_buf,recv_buf);
453 else{
454 tmp_buf.resize(send_buf.size() * m_nb_rank);
455 allGather(send_buf,tmp_buf);
456 }
457}
458
459/*---------------------------------------------------------------------------*/
460/*---------------------------------------------------------------------------*/
461
462template<class Type> void SharedMemoryParallelDispatch<Type>::
463allGatherVariable(Span<const Type> send_buf,Array<Type>& recv_buf)
464{
465 ResizableArrayRef recv_buf_ref(recv_buf);
466 _genericAllGatherVariable(ConstMemoryView(send_buf),&recv_buf_ref);
467}
468
469/*---------------------------------------------------------------------------*/
470/*---------------------------------------------------------------------------*/
471
472template<class Type> void SharedMemoryParallelDispatch<Type>::
473gatherVariable(Span<const Type> send_buf,Array<Type>& recv_buf,Int32 root_rank)
474{
475 UniqueArray<Type> tmp_buf;
476 if (m_rank==root_rank)
477 allGatherVariable(send_buf,recv_buf);
478 else
479 allGatherVariable(send_buf,tmp_buf);
480}
481
482/*---------------------------------------------------------------------------*/
483/*---------------------------------------------------------------------------*/
484
485template<class Type> void SharedMemoryParallelDispatch<Type>::
486scatterVariable(Span<const Type> send_buf,Span<Type> recv_buf,Int32 root)
487{
488 _genericScatterVariable(ConstMemoryView(send_buf),MutableMemoryView(recv_buf),root);
489}
490
491/*---------------------------------------------------------------------------*/
492/*---------------------------------------------------------------------------*/
493
494template<class Type> void SharedMemoryParallelDispatch<Type>::
495allToAll(Span<const Type> send_buf,Span<Type> recv_buf,Int32 count)
496{
497 _genericAllToAll(ConstMemoryView(send_buf),MutableMemoryView(recv_buf),count);
498}
499
500/*---------------------------------------------------------------------------*/
501/*---------------------------------------------------------------------------*/
502
503template<class Type> void SharedMemoryParallelDispatch<Type>::
504allToAllVariable(Span<const Type> send_buf, ConstArrayView<Int32> send_count,
505 ConstArrayView<Int32> send_index,
506 Span<Type> recv_buf, ConstArrayView<Int32> recv_count,
507 Int32ConstArrayView recv_index
508 )
509{
510 _genericAllToAllVariable(ConstMemoryView(send_buf), send_count, send_index,
511 MutableMemoryView(recv_buf), recv_count, recv_index);
512}
513
514/*---------------------------------------------------------------------------*/
515/*---------------------------------------------------------------------------*/
516
517template<class Type> auto SharedMemoryParallelDispatch<Type>::
518send(Span<const Type> send_buffer,Int32 rank,bool is_blocking) -> Request
519{
520 auto block_mode = (is_blocking) ? Parallel::Blocking : Parallel::NonBlocking;
521 auto p2p_message = m_parallel_mng->buildMessage(rank,block_mode);
522 return send(send_buffer,p2p_message);
523}
524
525/*---------------------------------------------------------------------------*/
526/*---------------------------------------------------------------------------*/
527
528template<class Type> void SharedMemoryParallelDispatch<Type>::
529send(ConstArrayView<Type> send_buf,Int32 rank)
530{
531 send(send_buf,rank,true);
532}
533
534/*---------------------------------------------------------------------------*/
535/*---------------------------------------------------------------------------*/
536
537template<class Type> Parallel::Request SharedMemoryParallelDispatch<Type>::
538receive(Span<Type> recv_buffer,Int32 rank,bool is_blocking)
539{
540 auto block_mode = (is_blocking) ? Parallel::Blocking : Parallel::NonBlocking;
541 auto p2p_message = m_parallel_mng->buildMessage(rank,block_mode);
542 return receive(recv_buffer,p2p_message);
543}
544
545/*---------------------------------------------------------------------------*/
546/*---------------------------------------------------------------------------*/
547
548template<class Type> Request SharedMemoryParallelDispatch<Type>::
549send(Span<const Type> send_buffer,const PointToPointMessageInfo& message2)
550{
551 return _genericSend(ConstMemoryView(send_buffer),message2);
552}
553
554/*---------------------------------------------------------------------------*/
555/*---------------------------------------------------------------------------*/
556
557template<class Type> Request SharedMemoryParallelDispatch<Type>::
558receive(Span<Type> recv_buffer,const PointToPointMessageInfo& message2)
559{
560 return _genericReceive(MutableMemoryView(recv_buffer),message2);
561}
562
563/*---------------------------------------------------------------------------*/
564/*---------------------------------------------------------------------------*/
565
566template<class Type> void SharedMemoryParallelDispatch<Type>::
567recv(ArrayView<Type> recv_buffer,Integer rank)
568{
569 recv(recv_buffer,rank,true);
570}
571
572/*---------------------------------------------------------------------------*/
573/*---------------------------------------------------------------------------*/
574
575template<class Type> void SharedMemoryParallelDispatch<Type>::
576sendRecv(ConstArrayView<Type> send_buffer,ArrayView<Type> recv_buffer,Integer proc)
577{
578 ARCANE_UNUSED(send_buffer);
579 ARCANE_UNUSED(recv_buffer);
580 ARCANE_UNUSED(proc);
581 throw NotImplementedException(A_FUNCINFO);
582}
583
584/*---------------------------------------------------------------------------*/
585/*---------------------------------------------------------------------------*/
586
587template<class Type> Type SharedMemoryParallelDispatch<Type>::
588allReduce(eReduceType op,Type send_buf)
589{
590 m_reduce_infos.reduce_value = send_buf;
591 //cout << "ALL REDUCE BEGIN RANk=" << m_rank << " TYPE=" << (int)op << " MY=" << send_buf << '\n';
592 cout.flush();
593 _collectiveBarrier();
594 Type ret = m_all_dispatchs[0]->m_reduce_infos.reduce_value;
595 switch(op){
596 case Parallel::ReduceMin:
597 for( Integer i=1; i<m_nb_rank; ++i )
598 ret = math::min(ret,m_all_dispatchs[i]->m_reduce_infos.reduce_value);
599 break;
600 case Parallel::ReduceMax:
601 for( Integer i=1; i<m_nb_rank; ++i )
602 ret = math::max(ret,m_all_dispatchs[i]->m_reduce_infos.reduce_value);
603 break;
604 case Parallel::ReduceSum:
605 for( Integer i=1; i<m_nb_rank; ++i )
606 ret = (Type)(ret + m_all_dispatchs[i]->m_reduce_infos.reduce_value);
607 break;
608 default:
609 ARCANE_FATAL("Bad reduce type {0}",(int)op);
610 }
611 //cout << "ALL REDUCE RANK=" << m_rank << " TYPE=" << (int)op << " MY=" << send_buf << " GLOBAL=" << ret << '\n';
612 _collectiveBarrier();
613 return ret;
614}
615
616/*---------------------------------------------------------------------------*/
617/*---------------------------------------------------------------------------*/
618
619template<class Type> void SharedMemoryParallelDispatch<Type>::
620_allReduceOrScan(eReduceType op, Span<Type> send_buf, bool is_scan)
621{
622 m_reduce_infos.reduce_buf = send_buf;
623 ++m_reduce_infos.m_index;
624 Int64 buf_size = send_buf.size();
625 UniqueArray<Type> ret(buf_size);
626 //cout << "ALL REDUCE BEGIN RANk=" << m_rank << " TYPE=" << (int)op << " MY=" << send_buf << '\n';
627 //cout.flush();
628 _collectiveBarrier();
629 {
630 Integer index0 = m_all_dispatchs[0]->m_reduce_infos.m_index;
631 for( Integer i=0; i<m_nb_rank; ++i ){
632 Integer indexi = m_all_dispatchs[i]->m_reduce_infos.m_index;
633 if (index0!=m_all_dispatchs[i]->m_reduce_infos.m_index){
634 ARCANE_FATAL("INTERNAL: incoherent all reduce i0={0} in={1} n={2}",
635 index0,indexi,i);
636 }
637 }
638 }
639 Int32 nb_rank = m_nb_rank;
640 if (is_scan)
641 nb_rank = m_rank + 1;
642 for( Integer j=0; j<buf_size; ++j )
643 ret[j] = m_all_dispatchs[0]->m_reduce_infos.reduce_buf[j];
644 switch(op){
645 case Parallel::ReduceMin:
646 for (Integer i = 1; i < nb_rank; ++i)
647 for( Integer j=0; j<buf_size; ++j )
648 ret[j] = math::min(ret[j],m_all_dispatchs[i]->m_reduce_infos.reduce_buf[j]);
649 break;
650 case Parallel::ReduceMax:
651 for (Integer i = 1; i < nb_rank; ++i)
652 for( Integer j=0; j<buf_size; ++j )
653 ret[j] = math::max(ret[j],m_all_dispatchs[i]->m_reduce_infos.reduce_buf[j]);
654 break;
655 case Parallel::ReduceSum:
656 for (Integer i = 1; i < nb_rank; ++i)
657 for( Integer j=0; j<buf_size; ++j )
658 ret[j] = (Type)(ret[j] + m_all_dispatchs[i]->m_reduce_infos.reduce_buf[j]);
659 break;
660 default:
661 ARCANE_FATAL("Bad reduce type");
662 }
663 //cout << "ALL REDUCE RANK=" << m_rank << " TYPE=" << (int)op << " MY=" << send_buf << " GLOBAL=" << ret << '\n';
664 _collectiveBarrier();
665 for( Integer j=0; j<buf_size; ++j )
666 send_buf[j] = ret[j];
667}
668
669/*---------------------------------------------------------------------------*/
670/*---------------------------------------------------------------------------*/
671
672template <class Type> void SharedMemoryParallelDispatch<Type>::
673allReduce(eReduceType op, Span<Type> send_buf)
674{
675 _allReduceOrScan(op, send_buf, false);
676}
677
678/*---------------------------------------------------------------------------*/
679/*---------------------------------------------------------------------------*/
680
681template<class Type> Request SharedMemoryParallelDispatch<Type>::
682nonBlockingAllReduce(eReduceType op,Span<const Type> send_buf,Span<Type> recv_buf)
683{
684 ARCANE_UNUSED(op);
685 ARCANE_UNUSED(send_buf);
686 ARCANE_UNUSED(recv_buf);
687 throw NotImplementedException(A_FUNCINFO);
688}
689
690/*---------------------------------------------------------------------------*/
691/*---------------------------------------------------------------------------*/
692
693template<class Type> Request SharedMemoryParallelDispatch<Type>::
694nonBlockingAllGather(Span<const Type> send_buf, Span<Type> recv_buf)
695{
696 ARCANE_UNUSED(send_buf);
697 ARCANE_UNUSED(recv_buf);
698 throw NotImplementedException(A_FUNCINFO);
699}
700
701/*---------------------------------------------------------------------------*/
702/*---------------------------------------------------------------------------*/
703
704template<class Type> Request SharedMemoryParallelDispatch<Type>::
705nonBlockingBroadcast(Span<Type> send_buf, Int32 rank)
706{
707 ARCANE_UNUSED(send_buf);
708 ARCANE_UNUSED(rank);
709 throw NotImplementedException(A_FUNCINFO);
710}
711
712/*---------------------------------------------------------------------------*/
713/*---------------------------------------------------------------------------*/
714
715template<class Type> Request SharedMemoryParallelDispatch<Type>::
716nonBlockingGather(Span<const Type> send_buf, Span<Type> recv_buf, Int32 rank)
717{
718 ARCANE_UNUSED(send_buf);
719 ARCANE_UNUSED(recv_buf);
720 ARCANE_UNUSED(rank);
721 throw NotImplementedException(A_FUNCINFO);
722}
723
724/*---------------------------------------------------------------------------*/
725/*---------------------------------------------------------------------------*/
726
727template<class Type> Request SharedMemoryParallelDispatch<Type>::
728nonBlockingAllToAll(Span<const Type> send_buf, Span<Type> recv_buf, Int32 count)
729{
730 ARCANE_UNUSED(send_buf);
731 ARCANE_UNUSED(recv_buf);
732 ARCANE_UNUSED(count);
733 throw NotImplementedException(A_FUNCINFO);
734}
735
736/*---------------------------------------------------------------------------*/
737/*---------------------------------------------------------------------------*/
738
739template<class Type> Request SharedMemoryParallelDispatch<Type>::
740nonBlockingAllToAllVariable(Span<const Type> send_buf, ConstArrayView<Int32> send_count,
741 ConstArrayView<Int32> send_index, Span<Type> recv_buf,
742 ConstArrayView<Int32> recv_count, ConstArrayView<Int32> recv_index)
743{
744 ARCANE_UNUSED(send_buf);
745 ARCANE_UNUSED(recv_buf);
746 ARCANE_UNUSED(send_count);
747 ARCANE_UNUSED(recv_count);
748 ARCANE_UNUSED(send_index);
749 ARCANE_UNUSED(recv_index);
750 throw NotImplementedException(A_FUNCINFO);
751}
752
753/*---------------------------------------------------------------------------*/
754/*---------------------------------------------------------------------------*/
755
756template<class Type> Type SharedMemoryParallelDispatch<Type>::
757scan(eReduceType op,Type send_buf)
758{
759 ARCANE_UNUSED(op);
760 ARCANE_UNUSED(send_buf);
761 throw NotImplementedException(A_FUNCINFO);
762}
763
764/*---------------------------------------------------------------------------*/
765/*---------------------------------------------------------------------------*/
766
767template<class Type> void SharedMemoryParallelDispatch<Type>::
768scan(eReduceType op,ArrayView<Type> send_buf)
769{
770 _allReduceOrScan(op, send_buf, true);
771}
772
773/*---------------------------------------------------------------------------*/
774/*---------------------------------------------------------------------------*/
775
776template<class Type> void SharedMemoryParallelDispatch<Type>::
777waitAll()
778{
779 // TEMPORAIRE: a priori pas utilisé
780 throw NotImplementedException(A_FUNCINFO);
781}
782
783/*---------------------------------------------------------------------------*/
784/*---------------------------------------------------------------------------*/
785
786template<class Type> Request SharedMemoryParallelDispatch<Type>::
788{
789 throw NotImplementedException(A_FUNCINFO);
790}
791
792/*---------------------------------------------------------------------------*/
793/*---------------------------------------------------------------------------*/
794
795template class SharedMemoryParallelDispatch<char>;
796template class SharedMemoryParallelDispatch<signed char>;
797template class SharedMemoryParallelDispatch<unsigned char>;
798template class SharedMemoryParallelDispatch<short>;
799template class SharedMemoryParallelDispatch<unsigned short>;
800template class SharedMemoryParallelDispatch<int>;
801template class SharedMemoryParallelDispatch<unsigned int>;
802template class SharedMemoryParallelDispatch<long>;
803template class SharedMemoryParallelDispatch<unsigned long>;
804template class SharedMemoryParallelDispatch<long long>;
805template class SharedMemoryParallelDispatch<unsigned long long>;
806template class SharedMemoryParallelDispatch<float>;
807template class SharedMemoryParallelDispatch<double>;
808template class SharedMemoryParallelDispatch<long double>;
809template class SharedMemoryParallelDispatch<APReal>;
810template class SharedMemoryParallelDispatch<Real2>;
811template class SharedMemoryParallelDispatch<Real3>;
812template class SharedMemoryParallelDispatch<Real2x2>;
813template class SharedMemoryParallelDispatch<Real3x3>;
814template class SharedMemoryParallelDispatch<HPReal>;
815
816/*---------------------------------------------------------------------------*/
817/*---------------------------------------------------------------------------*/
818
819} // End namespace Arcane::MessagePassing
820
821/*---------------------------------------------------------------------------*/
822/*---------------------------------------------------------------------------*/
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Informations pour un message 'gather' pour le type de données DataType.
Interface d'une file de messages avec les threads.
Gestionnaire du parallélisme utilisant les threads.
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
Definition Parallel.h:50
Int32 Integer
Type représentant un entier.
UniqueArray< Int32 > Int32UniqueArray
Tableau dynamique à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:428
Type
Type of JSON value.
Definition rapidjson.h:665