14#include "arcane/utils/ArcanePrecomp.h"
16#include "arcane/utils/Array.h"
17#include "arcane/utils/PlatformUtils.h"
18#include "arcane/utils/String.h"
19#include "arcane/utils/ITraceMng.h"
20#include "arcane/utils/NumericTypes.h"
21#include "arcane/utils/APReal.h"
22#include "arcane/utils/NotImplementedException.h"
23#include "arcane/utils/MemoryView.h"
26#include "arcane/MeshVariable.h"
27#include "arcane/IParallelMng.h"
28#include "arcane/ItemGroup.h"
29#include "arcane/IMesh.h"
30#include "arcane/IBase.h"
32#include "arcane/parallel/thread/SharedMemoryParallelDispatch.h"
33#include "arcane/parallel/thread/SharedMemoryParallelMng.h"
34#include "arcane/parallel/thread/ISharedMemoryMessageQueue.h"
36#include "arccore/message_passing/PointToPointMessageInfo.h"
70SharedMemoryParallelDispatchBase::
73 ArrayView<SharedMemoryParallelDispatchBase*> all_dispatchs_base)
75, m_parallel_mng(parallel_mng)
76, m_rank(parallel_mng->commRank())
77, m_nb_rank(parallel_mng->commSize())
78, m_message_queue(message_queue)
79, m_all_dispatchs_base(all_dispatchs_base)
86void SharedMemoryParallelDispatchBase::
89 m_parallel_mng->getThreadBarrier()->wait();
95void SharedMemoryParallelDispatchBase::
96_genericAllToAll(ConstMemoryView send_buf,MutableMemoryView recv_buf,Int32 count)
98 Int32 nb_rank = m_nb_rank;
106 for( Integer i=0; i<nb_rank; ++i ){
107 send_indexes[i] = count * i;
108 recv_indexes[i] = count * i;
110 _genericAllToAllVariable(send_buf,send_count,send_indexes,recv_buf,recv_count,recv_indexes);
116void SharedMemoryParallelDispatchBase::
117_genericAllToAllVariable(ConstMemoryView send_buf,
118 Span<const Int32> send_count,
119 Span<const Int32> send_index,
120 MutableMemoryView recv_buf,
121 Span<const Int32> recv_count,
122 Span<const Int32> recv_index
125 m_alltoallv_infos.send_buf = send_buf;
126 m_alltoallv_infos.send_count = send_count;
127 m_alltoallv_infos.send_index = send_index;
128 m_alltoallv_infos.recv_buf = recv_buf;
129 m_alltoallv_infos.recv_count = recv_count;
130 m_alltoallv_infos.recv_index = recv_index;
131 _collectiveBarrier();
133 Int32 my_rank = m_rank;
134 MutableMemoryView recv_mem_buf(recv_buf);
135 for( Integer i=0; i<m_nb_rank; ++i ){
136 AllToAllVariableInfo ainfo = m_all_dispatchs_base[i]->m_alltoallv_infos;
137 ConstMemoryView view(ainfo.send_buf);
138 Integer index = ainfo.send_index[my_rank];
139 Integer count = ainfo.send_count[my_rank];
140 MemoryUtils::copyHost(recv_mem_buf.subView(global_index,count), view.subView(index,count));
141 global_index += count;
143 _collectiveBarrier();
149void SharedMemoryParallelDispatchBase::
150_genericAllGather(ConstMemoryView send_buf,MutableMemoryView recv_buf)
152 m_const_view = send_buf;
153 _collectiveBarrier();
154 MutableMemoryView recv_mem_view(recv_buf);
156 for( Int32 i=0; i<m_nb_rank; ++i ){
157 ConstMemoryView view(m_all_dispatchs_base[i]->m_const_view);
158 Int64 size = view.nbElement();
159 MemoryUtils::copyHost(recv_mem_view.subView(index,size), view);
162 _collectiveBarrier();
168void SharedMemoryParallelDispatchBase::
169_genericAllGatherVariable(ConstMemoryView send_buf,IResizableArray* recv_buf)
171 m_const_view = send_buf;
172 _collectiveBarrier();
173 Int64 total_size = 0;
174 for( Integer i=0; i<m_nb_rank; ++i ){
175 total_size += m_all_dispatchs_base[i]->m_const_view.nbElement();
177 recv_buf->resize(total_size);
178 MutableMemoryView recv_mem_view(recv_buf->memoryView());
180 for( Integer i=0; i<m_nb_rank; ++i ){
181 ConstMemoryView view(m_all_dispatchs_base[i]->m_const_view);
182 Int64 size = view.nbElement();
183 MemoryUtils::copyHost(recv_mem_view.subView(index,size), view);
186 _collectiveBarrier();
192void SharedMemoryParallelDispatchBase::
193_genericScatterVariable(ConstMemoryView send_buf,MutableMemoryView recv_buf,Int32 root)
195 m_const_view = send_buf;
196 m_recv_view = recv_buf;
197 _collectiveBarrier();
199 ConstMemoryView const_view(m_const_view);
201 for( Integer i=0; i<m_nb_rank; ++i ){
202 MutableMemoryView view(m_all_dispatchs_base[i]->m_recv_view);
203 Int64 size = view.nbElement();
204 MemoryUtils::copyHost(view, const_view.subView(index,size));
208 _collectiveBarrier();
214Request SharedMemoryParallelDispatchBase::
215_genericSend(ConstMemoryView send_buffer,
const PointToPointMessageInfo& message2)
217 PointToPointMessageInfo message(message2);
218 message.setEmiterRank(MessageRank(m_rank));
219 bool is_blocking = message.isBlocking();
220 if (message.isRankTag()){
221 Request r = m_message_queue->addSend(message,SendBufferInfo(send_buffer));
223 m_message_queue->waitAll(ArrayView<Request>(1,&r));
228 if (message.isMessageId()){
230 ARCCORE_THROW(NotSupportedException,
"Invalid generic send with MessageId");
232 ARCCORE_THROW(NotSupportedException,
"Invalid message_info");
238Request SharedMemoryParallelDispatchBase::
239_genericReceive(MutableMemoryView recv_buffer,
const PointToPointMessageInfo& message2)
241 PointToPointMessageInfo message(message2);
242 bool is_blocking = message.isBlocking();
243 message.setEmiterRank(MessageRank(m_rank));
244 ReceiveBufferInfo buf{recv_buffer};
245 Request r = m_message_queue->addReceive(message,buf);
247 m_message_queue->waitAll(ArrayView<Request>(1,&r));
248 return MP::Request();
256void SharedMemoryParallelDispatchBase::
257_genericBroadcast(MutableMemoryView send_buf,Int32 rank)
259 m_broadcast_view = send_buf;
260 _collectiveBarrier();
261 MemoryUtils::copyHost(m_broadcast_view, m_all_dispatchs_base[rank]->m_broadcast_view);
262 _collectiveBarrier();
271template<
class Type> SharedMemoryParallelDispatch<Type>::
272SharedMemoryParallelDispatch(ITraceMng* tm,SharedMemoryParallelMng* parallel_mng,
273 ISharedMemoryMessageQueue* message_queue,
274 impl::ShareMemoryDispatcherContainer<Type>& containers)
275: BaseClass(tm,parallel_mng,message_queue,containers.all_dispatchs_base)
276, m_all_dispatchs(containers.all_dispatchs)
278 m_reduce_infos.m_index = 0;
279 m_all_dispatchs[m_rank] =
this;
280 m_all_dispatchs_base[m_rank] =
this;
286template<
class Type> SharedMemoryParallelDispatch<Type>::
287~SharedMemoryParallelDispatch()
295template<
class Type>
void SharedMemoryParallelDispatch<Type>::
304class _ThreadIntegralType
307 typedef FalseType IsIntegral;
310#define ARCANE_DEFINE_INTEGRAL_TYPE(datatype)\
312class _ThreadIntegralType<datatype>\
315 typedef TrueType IsIntegral;\
318ARCANE_DEFINE_INTEGRAL_TYPE(
long long);
319ARCANE_DEFINE_INTEGRAL_TYPE(
long);
320ARCANE_DEFINE_INTEGRAL_TYPE(
int);
321ARCANE_DEFINE_INTEGRAL_TYPE(
short);
322ARCANE_DEFINE_INTEGRAL_TYPE(
unsigned long long);
323ARCANE_DEFINE_INTEGRAL_TYPE(
unsigned long);
324ARCANE_DEFINE_INTEGRAL_TYPE(
unsigned int);
325ARCANE_DEFINE_INTEGRAL_TYPE(
unsigned short);
326ARCANE_DEFINE_INTEGRAL_TYPE(
double);
327ARCANE_DEFINE_INTEGRAL_TYPE(
float);
328ARCANE_DEFINE_INTEGRAL_TYPE(HPReal);
336template<
class Type>
void
337_computeMinMaxSum2(ArrayView<SharedMemoryParallelDispatch<Type>*> all_dispatchs,
339 Int32& min_rank,Int32& max_rank,Int32 nb_rank,FalseType)
341 ARCANE_UNUSED(all_dispatchs);
342 ARCANE_UNUSED(min_val);
343 ARCANE_UNUSED(max_val);
344 ARCANE_UNUSED(sum_val);
345 ARCANE_UNUSED(min_rank);
346 ARCANE_UNUSED(max_rank);
347 ARCANE_UNUSED(nb_rank);
349 throw NotImplementedException(A_FUNCINFO);
355template<
class Type>
void
356_computeMinMaxSum2(ArrayView<SharedMemoryParallelDispatch<Type>*> all_dispatchs,
358 Int32& min_rank,Int32& max_rank,Int32 nb_rank,TrueType)
360 Type _min_val = all_dispatchs[0]->m_reduce_infos.reduce_value;
361 Type _max_val = _min_val;
362 Type _sum_val = _min_val;
365 for( Integer i=1; i<nb_rank; ++i ){
366 Type cval = all_dispatchs[i]->m_reduce_infos.reduce_value;
375 _sum_val = (
Type)(_sum_val + cval);
380 min_rank = _min_rank;
381 max_rank = _max_rank;
389template<
class Type>
void SharedMemoryParallelDispatch<Type>::
391 Int32& min_rank,Int32& max_rank)
393 typedef typename _ThreadIntegralType<Type>::IsIntegral IntegralType;
394 m_reduce_infos.reduce_value = val;
395 _collectiveBarrier();
396 _computeMinMaxSum2(m_all_dispatchs,min_val,max_val,sum_val,min_rank,max_rank,m_nb_rank,IntegralType());
397 _collectiveBarrier();
403template<
class Type>
void SharedMemoryParallelDispatch<Type>::
404computeMinMaxSum(ConstArrayView<Type> values,
405 ArrayView<Type> min_values,
406 ArrayView<Type> max_values,
407 ArrayView<Type> sum_values,
408 ArrayView<Int32> min_ranks,
409 ArrayView<Int32> max_ranks)
413 typedef typename _ThreadIntegralType<Type>::IsIntegral IntegralType;
415 for(Integer i=0;i<n;++i) {
416 m_reduce_infos.reduce_value = values[i];
417 _collectiveBarrier();
418 _computeMinMaxSum2(m_all_dispatchs,min_values[i],max_values[i],sum_values[i],
419 min_ranks[i],max_ranks[i],m_nb_rank,IntegralType());
420 _collectiveBarrier();
427template<
class Type>
void SharedMemoryParallelDispatch<Type>::
428broadcast(Span<Type> send_buf,Int32 rank)
430 _genericBroadcast(MutableMemoryView(send_buf),rank);
436template<
class Type>
void SharedMemoryParallelDispatch<Type>::
437allGather(Span<const Type> send_buf,Span<Type> recv_buf)
439 _genericAllGather(ConstMemoryView{send_buf},MutableMemoryView{recv_buf});
445template<
class Type>
void SharedMemoryParallelDispatch<Type>::
446gather(Span<const Type> send_buf,Span<Type> recv_buf,Int32 root_rank)
448 UniqueArray<Type> tmp_buf;
449 if (m_rank==root_rank)
450 allGather(send_buf,recv_buf);
452 tmp_buf.resize(send_buf.size() * m_nb_rank);
453 allGather(send_buf,tmp_buf);
460template<
class Type>
void SharedMemoryParallelDispatch<Type>::
461allGatherVariable(Span<const Type> send_buf,Array<Type>& recv_buf)
463 ResizableArrayRef recv_buf_ref(recv_buf);
464 _genericAllGatherVariable(ConstMemoryView(send_buf),&recv_buf_ref);
470template<
class Type>
void SharedMemoryParallelDispatch<Type>::
471gatherVariable(Span<const Type> send_buf,Array<Type>& recv_buf,Int32 root_rank)
473 UniqueArray<Type> tmp_buf;
474 if (m_rank==root_rank)
475 allGatherVariable(send_buf,recv_buf);
477 allGatherVariable(send_buf,tmp_buf);
483template<
class Type>
void SharedMemoryParallelDispatch<Type>::
484scatterVariable(Span<const Type> send_buf,Span<Type> recv_buf,Int32 root)
486 _genericScatterVariable(ConstMemoryView(send_buf),MutableMemoryView(recv_buf),root);
492template<
class Type>
void SharedMemoryParallelDispatch<Type>::
493allToAll(Span<const Type> send_buf,Span<Type> recv_buf,Int32 count)
495 _genericAllToAll(ConstMemoryView(send_buf),MutableMemoryView(recv_buf),count);
501template<
class Type>
void SharedMemoryParallelDispatch<Type>::
502allToAllVariable(Span<const Type> send_buf, ConstArrayView<Int32> send_count,
503 ConstArrayView<Int32> send_index,
504 Span<Type> recv_buf, ConstArrayView<Int32> recv_count,
505 Int32ConstArrayView recv_index
508 _genericAllToAllVariable(ConstMemoryView(send_buf), send_count, send_index,
509 MutableMemoryView(recv_buf), recv_count, recv_index);
515template<
class Type>
auto SharedMemoryParallelDispatch<Type>::
516send(Span<const Type> send_buffer,Int32 rank,
bool is_blocking) -> Request
518 auto block_mode = (is_blocking) ? Parallel::Blocking : Parallel::NonBlocking;
519 auto p2p_message = m_parallel_mng->buildMessage(rank,block_mode);
520 return send(send_buffer,p2p_message);
526template<
class Type>
void SharedMemoryParallelDispatch<Type>::
527send(ConstArrayView<Type> send_buf,Int32 rank)
529 send(send_buf,rank,
true);
535template<
class Type> Parallel::Request SharedMemoryParallelDispatch<Type>::
536receive(Span<Type> recv_buffer,Int32 rank,
bool is_blocking)
538 auto block_mode = (is_blocking) ? Parallel::Blocking : Parallel::NonBlocking;
539 auto p2p_message = m_parallel_mng->buildMessage(rank,block_mode);
540 return receive(recv_buffer,p2p_message);
546template<
class Type> Request SharedMemoryParallelDispatch<Type>::
547send(Span<const Type> send_buffer,
const PointToPointMessageInfo& message2)
549 return _genericSend(ConstMemoryView(send_buffer),message2);
555template<
class Type> Request SharedMemoryParallelDispatch<Type>::
556receive(Span<Type> recv_buffer,
const PointToPointMessageInfo& message2)
558 return _genericReceive(MutableMemoryView(recv_buffer),message2);
564template<
class Type>
void SharedMemoryParallelDispatch<Type>::
565recv(ArrayView<Type> recv_buffer,Integer rank)
567 recv(recv_buffer,rank,
true);
573template<
class Type>
void SharedMemoryParallelDispatch<Type>::
574sendRecv(ConstArrayView<Type> send_buffer,ArrayView<Type> recv_buffer,Integer proc)
576 ARCANE_UNUSED(send_buffer);
577 ARCANE_UNUSED(recv_buffer);
579 throw NotImplementedException(A_FUNCINFO);
585template<
class Type>
Type SharedMemoryParallelDispatch<Type>::
586allReduce(eReduceType op,
Type send_buf)
588 m_reduce_infos.reduce_value = send_buf;
591 _collectiveBarrier();
592 Type ret = m_all_dispatchs[0]->m_reduce_infos.reduce_value;
594 case Parallel::ReduceMin:
595 for( Integer i=1; i<m_nb_rank; ++i )
596 ret = math::min(ret,m_all_dispatchs[i]->m_reduce_infos.reduce_value);
598 case Parallel::ReduceMax:
599 for( Integer i=1; i<m_nb_rank; ++i )
600 ret = math::max(ret,m_all_dispatchs[i]->m_reduce_infos.reduce_value);
602 case Parallel::ReduceSum:
603 for( Integer i=1; i<m_nb_rank; ++i )
604 ret = (
Type)(ret + m_all_dispatchs[i]->m_reduce_infos.reduce_value);
610 _collectiveBarrier();
617template<
class Type>
void SharedMemoryParallelDispatch<Type>::
618_allReduceOrScan(eReduceType op, Span<Type> send_buf,
bool is_scan)
620 m_reduce_infos.reduce_buf = send_buf;
621 ++m_reduce_infos.m_index;
622 Int64 buf_size = send_buf.size();
623 UniqueArray<Type> ret(buf_size);
626 _collectiveBarrier();
628 Integer index0 = m_all_dispatchs[0]->m_reduce_infos.m_index;
629 for( Integer i=0; i<m_nb_rank; ++i ){
630 Integer indexi = m_all_dispatchs[i]->m_reduce_infos.m_index;
631 if (index0!=m_all_dispatchs[i]->m_reduce_infos.m_index){
632 ARCANE_FATAL(
"INTERNAL: incoherent all reduce i0={0} in={1} n={2}",
637 Int32 nb_rank = m_nb_rank;
639 nb_rank = m_rank + 1;
640 for( Integer j=0; j<buf_size; ++j )
641 ret[j] = m_all_dispatchs[0]->m_reduce_infos.reduce_buf[j];
643 case Parallel::ReduceMin:
644 for (Integer i = 1; i < nb_rank; ++i)
645 for( Integer j=0; j<buf_size; ++j )
646 ret[j] = math::min(ret[j],m_all_dispatchs[i]->m_reduce_infos.reduce_buf[j]);
648 case Parallel::ReduceMax:
649 for (Integer i = 1; i < nb_rank; ++i)
650 for( Integer j=0; j<buf_size; ++j )
651 ret[j] = math::max(ret[j],m_all_dispatchs[i]->m_reduce_infos.reduce_buf[j]);
653 case Parallel::ReduceSum:
654 for (Integer i = 1; i < nb_rank; ++i)
655 for( Integer j=0; j<buf_size; ++j )
656 ret[j] = (
Type)(ret[j] + m_all_dispatchs[i]->m_reduce_infos.reduce_buf[j]);
662 _collectiveBarrier();
663 for( Integer j=0; j<buf_size; ++j )
664 send_buf[j] = ret[j];
670template <
class Type>
void SharedMemoryParallelDispatch<Type>::
671allReduce(eReduceType op, Span<Type> send_buf)
673 _allReduceOrScan(op, send_buf,
false);
679template<
class Type> Request SharedMemoryParallelDispatch<Type>::
680nonBlockingAllReduce(eReduceType op,Span<const Type> send_buf,Span<Type> recv_buf)
683 ARCANE_UNUSED(send_buf);
684 ARCANE_UNUSED(recv_buf);
685 throw NotImplementedException(A_FUNCINFO);
691template<
class Type> Request SharedMemoryParallelDispatch<Type>::
692nonBlockingAllGather(Span<const Type> send_buf, Span<Type> recv_buf)
694 ARCANE_UNUSED(send_buf);
695 ARCANE_UNUSED(recv_buf);
696 throw NotImplementedException(A_FUNCINFO);
702template<
class Type> Request SharedMemoryParallelDispatch<Type>::
703nonBlockingBroadcast(Span<Type> send_buf, Int32 rank)
705 ARCANE_UNUSED(send_buf);
707 throw NotImplementedException(A_FUNCINFO);
713template<
class Type> Request SharedMemoryParallelDispatch<Type>::
714nonBlockingGather(Span<const Type> send_buf, Span<Type> recv_buf, Int32 rank)
716 ARCANE_UNUSED(send_buf);
717 ARCANE_UNUSED(recv_buf);
719 throw NotImplementedException(A_FUNCINFO);
725template<
class Type> Request SharedMemoryParallelDispatch<Type>::
726nonBlockingAllToAll(Span<const Type> send_buf, Span<Type> recv_buf, Int32 count)
728 ARCANE_UNUSED(send_buf);
729 ARCANE_UNUSED(recv_buf);
730 ARCANE_UNUSED(count);
731 throw NotImplementedException(A_FUNCINFO);
737template<
class Type> Request SharedMemoryParallelDispatch<Type>::
738nonBlockingAllToAllVariable(Span<const Type> send_buf, ConstArrayView<Int32> send_count,
739 ConstArrayView<Int32> send_index, Span<Type> recv_buf,
740 ConstArrayView<Int32> recv_count, ConstArrayView<Int32> recv_index)
742 ARCANE_UNUSED(send_buf);
743 ARCANE_UNUSED(recv_buf);
744 ARCANE_UNUSED(send_count);
745 ARCANE_UNUSED(recv_count);
746 ARCANE_UNUSED(send_index);
747 ARCANE_UNUSED(recv_index);
748 throw NotImplementedException(A_FUNCINFO);
754template<
class Type>
Type SharedMemoryParallelDispatch<Type>::
755scan(eReduceType op,
Type send_buf)
758 ARCANE_UNUSED(send_buf);
759 throw NotImplementedException(A_FUNCINFO);
765template<
class Type>
void SharedMemoryParallelDispatch<Type>::
766scan(eReduceType op,ArrayView<Type> send_buf)
768 _allReduceOrScan(op, send_buf,
true);
774template<
class Type>
void SharedMemoryParallelDispatch<Type>::
778 throw NotImplementedException(A_FUNCINFO);
784template<
class Type> Request SharedMemoryParallelDispatch<Type>::
787 throw NotImplementedException(A_FUNCINFO);
793template class SharedMemoryParallelDispatch<char>;
794template class SharedMemoryParallelDispatch<signed char>;
795template class SharedMemoryParallelDispatch<unsigned char>;
796template class SharedMemoryParallelDispatch<short>;
797template class SharedMemoryParallelDispatch<unsigned short>;
798template class SharedMemoryParallelDispatch<int>;
799template class SharedMemoryParallelDispatch<unsigned int>;
800template class SharedMemoryParallelDispatch<long>;
801template class SharedMemoryParallelDispatch<unsigned long>;
802template class SharedMemoryParallelDispatch<long long>;
803template class SharedMemoryParallelDispatch<unsigned long long>;
804template class SharedMemoryParallelDispatch<float>;
805template class SharedMemoryParallelDispatch<double>;
806template class SharedMemoryParallelDispatch<long double>;
807template class SharedMemoryParallelDispatch<APReal>;
808template class SharedMemoryParallelDispatch<Real2>;
809template class SharedMemoryParallelDispatch<Real3>;
810template class SharedMemoryParallelDispatch<Real2x2>;
811template class SharedMemoryParallelDispatch<Real3x3>;
812template class SharedMemoryParallelDispatch<HPReal>;
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Fonctions de gestion mémoire et des allocateurs.
Informations pour un message 'gather' pour le type de données DataType.
Interface d'une file de messages avec les threads.
Gestionnaire du parallélisme utilisant les threads.
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
Int32 Integer
Type représentant un entier.
UniqueArray< Int32 > Int32UniqueArray
Tableau dynamique à une dimension d'entiers 32 bits.