14#include "arcane/utils/ArcanePrecomp.h"
16#include "arcane/utils/Array.h"
17#include "arcane/utils/PlatformUtils.h"
18#include "arcane/utils/String.h"
19#include "arcane/utils/ITraceMng.h"
20#include "arcane/utils/Real2.h"
21#include "arcane/utils/Real3.h"
22#include "arcane/utils/Real2x2.h"
23#include "arcane/utils/Real3x3.h"
24#include "arcane/utils/APReal.h"
25#include "arcane/utils/NotImplementedException.h"
26#include "arcane/utils/MemoryView.h"
28#include "arcane/MeshVariable.h"
29#include "arcane/IParallelMng.h"
30#include "arcane/ItemGroup.h"
31#include "arcane/IMesh.h"
32#include "arcane/IBase.h"
34#include "arcane/parallel/thread/SharedMemoryParallelDispatch.h"
35#include "arcane/parallel/thread/SharedMemoryParallelMng.h"
36#include "arcane/parallel/thread/ISharedMemoryMessageQueue.h"
38#include "arccore/message_passing/PointToPointMessageInfo.h"
72SharedMemoryParallelDispatchBase::
73SharedMemoryParallelDispatchBase(ITraceMng* tm,SharedMemoryParallelMng* parallel_mng,
74 ISharedMemoryMessageQueue* message_queue,
75 ArrayView<SharedMemoryParallelDispatchBase*> all_dispatchs_base)
77, m_parallel_mng(parallel_mng)
78, m_rank(parallel_mng->commRank())
79, m_nb_rank(parallel_mng->commSize())
80, m_message_queue(message_queue)
81, m_all_dispatchs_base(all_dispatchs_base)
88void SharedMemoryParallelDispatchBase::
91 m_parallel_mng->getThreadBarrier()->wait();
97void SharedMemoryParallelDispatchBase::
98_genericAllToAll(ConstMemoryView send_buf,MutableMemoryView recv_buf,
Int32 count)
100 Int32 nb_rank = m_nb_rank;
108 for( Integer i=0; i<nb_rank; ++i ){
109 send_indexes[i] = count * i;
110 recv_indexes[i] = count * i;
112 _genericAllToAllVariable(send_buf,send_count,send_indexes,recv_buf,recv_count,recv_indexes);
118void SharedMemoryParallelDispatchBase::
119_genericAllToAllVariable(ConstMemoryView send_buf,
120 Span<const Int32> send_count,
121 Span<const Int32> send_index,
122 MutableMemoryView recv_buf,
123 Span<const Int32> recv_count,
124 Span<const Int32> recv_index
127 m_alltoallv_infos.send_buf = send_buf;
128 m_alltoallv_infos.send_count = send_count;
129 m_alltoallv_infos.send_index = send_index;
130 m_alltoallv_infos.recv_buf = recv_buf;
131 m_alltoallv_infos.recv_count = recv_count;
132 m_alltoallv_infos.recv_index = recv_index;
133 _collectiveBarrier();
135 Int32 my_rank = m_rank;
136 MutableMemoryView recv_mem_buf(recv_buf);
137 for( Integer i=0; i<m_nb_rank; ++i ){
138 AllToAllVariableInfo ainfo = m_all_dispatchs_base[i]->m_alltoallv_infos;
139 ConstMemoryView view(ainfo.send_buf);
140 Integer index = ainfo.send_index[my_rank];
141 Integer count = ainfo.send_count[my_rank];
142 recv_mem_buf.subView(global_index,count).copyHost(view.subView(index,count));
143 global_index += count;
145 _collectiveBarrier();
151void SharedMemoryParallelDispatchBase::
152_genericAllGather(ConstMemoryView send_buf,MutableMemoryView recv_buf)
154 m_const_view = send_buf;
155 _collectiveBarrier();
156 MutableMemoryView recv_mem_view(recv_buf);
158 for(
Int32 i=0; i<m_nb_rank; ++i ){
159 ConstMemoryView view(m_all_dispatchs_base[i]->m_const_view);
160 Int64 size = view.nbElement();
161 recv_mem_view.subView(index,size).copyHost(view);
164 _collectiveBarrier();
170void SharedMemoryParallelDispatchBase::
171_genericAllGatherVariable(ConstMemoryView send_buf,IResizableArray* recv_buf)
173 m_const_view = send_buf;
174 _collectiveBarrier();
175 Int64 total_size = 0;
176 for( Integer i=0; i<m_nb_rank; ++i ){
177 total_size += m_all_dispatchs_base[i]->m_const_view.nbElement();
179 recv_buf->resize(total_size);
180 MutableMemoryView recv_mem_view(recv_buf->memoryView());
182 for( Integer i=0; i<m_nb_rank; ++i ){
183 ConstMemoryView view(m_all_dispatchs_base[i]->m_const_view);
184 Int64 size = view.nbElement();
185 recv_mem_view.subView(index,size).copyHost(view);
188 _collectiveBarrier();
194void SharedMemoryParallelDispatchBase::
195_genericScatterVariable(ConstMemoryView send_buf,MutableMemoryView recv_buf,
Int32 root)
197 m_const_view = send_buf;
198 m_recv_view = recv_buf;
199 _collectiveBarrier();
201 ConstMemoryView const_view(m_const_view);
203 for( Integer i=0; i<m_nb_rank; ++i ){
204 MutableMemoryView view(m_all_dispatchs_base[i]->m_recv_view);
205 Int64 size = view.nbElement();
206 view.copyHost(const_view.subView(index,size));
210 _collectiveBarrier();
216Request SharedMemoryParallelDispatchBase::
217_genericSend(ConstMemoryView send_buffer,
const PointToPointMessageInfo& message2)
219 PointToPointMessageInfo message(message2);
220 message.setEmiterRank(MessageRank(m_rank));
221 bool is_blocking = message.isBlocking();
222 if (message.isRankTag()){
223 Request r = m_message_queue->addSend(message,SendBufferInfo(send_buffer));
225 m_message_queue->waitAll(ArrayView<Request>(1,&r));
230 if (message.isMessageId()){
232 ARCCORE_THROW(NotSupportedException,
"Invalid generic send with MessageId");
234 ARCCORE_THROW(NotSupportedException,
"Invalid message_info");
240Request SharedMemoryParallelDispatchBase::
241_genericReceive(MutableMemoryView recv_buffer,
const PointToPointMessageInfo& message2)
243 PointToPointMessageInfo message(message2);
244 bool is_blocking = message.isBlocking();
245 message.setEmiterRank(MessageRank(m_rank));
246 ReceiveBufferInfo buf{recv_buffer};
247 Request r = m_message_queue->addReceive(message,buf);
249 m_message_queue->waitAll(ArrayView<Request>(1,&r));
258void SharedMemoryParallelDispatchBase::
259_genericBroadcast(MutableMemoryView send_buf,
Int32 rank)
261 m_broadcast_view = send_buf;
262 _collectiveBarrier();
263 m_broadcast_view.copyHost(m_all_dispatchs_base[rank]->m_broadcast_view);
264 _collectiveBarrier();
273template<
class Type> SharedMemoryParallelDispatch<Type>::
274SharedMemoryParallelDispatch(ITraceMng* tm,SharedMemoryParallelMng* parallel_mng,
275 ISharedMemoryMessageQueue* message_queue,
276 impl::ShareMemoryDispatcherContainer<Type>& containers)
277: BaseClass(tm,parallel_mng,message_queue,containers.all_dispatchs_base)
278, m_all_dispatchs(containers.all_dispatchs)
280 m_reduce_infos.m_index = 0;
281 m_all_dispatchs[m_rank] =
this;
282 m_all_dispatchs_base[m_rank] =
this;
288template<
class Type> SharedMemoryParallelDispatch<Type>::
289~SharedMemoryParallelDispatch()
297template<
class Type>
void SharedMemoryParallelDispatch<Type>::
306class _ThreadIntegralType
309 typedef FalseType IsIntegral;
312#define ARCANE_DEFINE_INTEGRAL_TYPE(datatype)\
314class _ThreadIntegralType<datatype>\
317 typedef TrueType IsIntegral;\
320ARCANE_DEFINE_INTEGRAL_TYPE(
long long);
321ARCANE_DEFINE_INTEGRAL_TYPE(
long);
322ARCANE_DEFINE_INTEGRAL_TYPE(
int);
323ARCANE_DEFINE_INTEGRAL_TYPE(
short);
324ARCANE_DEFINE_INTEGRAL_TYPE(
unsigned long long);
325ARCANE_DEFINE_INTEGRAL_TYPE(
unsigned long);
326ARCANE_DEFINE_INTEGRAL_TYPE(
unsigned int);
327ARCANE_DEFINE_INTEGRAL_TYPE(
unsigned short);
328ARCANE_DEFINE_INTEGRAL_TYPE(
double);
329ARCANE_DEFINE_INTEGRAL_TYPE(
float);
330ARCANE_DEFINE_INTEGRAL_TYPE(HPReal);
338template<
class Type>
void
339_computeMinMaxSum2(ArrayView<SharedMemoryParallelDispatch<Type>*> all_dispatchs,
343 ARCANE_UNUSED(all_dispatchs);
344 ARCANE_UNUSED(min_val);
345 ARCANE_UNUSED(max_val);
346 ARCANE_UNUSED(sum_val);
347 ARCANE_UNUSED(min_rank);
348 ARCANE_UNUSED(max_rank);
349 ARCANE_UNUSED(nb_rank);
351 throw NotImplementedException(A_FUNCINFO);
357template<
class Type>
void
358_computeMinMaxSum2(ArrayView<SharedMemoryParallelDispatch<Type>*> all_dispatchs,
362 Type _min_val = all_dispatchs[0]->m_reduce_infos.reduce_value;
363 Type _max_val = _min_val;
364 Type _sum_val = _min_val;
367 for( Integer i=1; i<nb_rank; ++i ){
368 Type cval = all_dispatchs[i]->m_reduce_infos.reduce_value;
377 _sum_val = (
Type)(_sum_val + cval);
382 min_rank = _min_rank;
383 max_rank = _max_rank;
391template<
class Type>
void SharedMemoryParallelDispatch<Type>::
395 typedef typename _ThreadIntegralType<Type>::IsIntegral IntegralType;
396 m_reduce_infos.reduce_value = val;
397 _collectiveBarrier();
398 _computeMinMaxSum2(m_all_dispatchs,min_val,max_val,sum_val,min_rank,max_rank,m_nb_rank,IntegralType());
399 _collectiveBarrier();
405template<
class Type>
void SharedMemoryParallelDispatch<Type>::
406computeMinMaxSum(ConstArrayView<Type> values,
407 ArrayView<Type> min_values,
408 ArrayView<Type> max_values,
409 ArrayView<Type> sum_values,
410 ArrayView<Int32> min_ranks,
411 ArrayView<Int32> max_ranks)
415 typedef typename _ThreadIntegralType<Type>::IsIntegral IntegralType;
417 for(Integer i=0;i<n;++i) {
418 m_reduce_infos.reduce_value = values[i];
419 _collectiveBarrier();
420 _computeMinMaxSum2(m_all_dispatchs,min_values[i],max_values[i],sum_values[i],
421 min_ranks[i],max_ranks[i],m_nb_rank,IntegralType());
422 _collectiveBarrier();
429template<
class Type>
void SharedMemoryParallelDispatch<Type>::
430broadcast(Span<Type> send_buf,
Int32 rank)
432 _genericBroadcast(MutableMemoryView(send_buf),rank);
438template<
class Type>
void SharedMemoryParallelDispatch<Type>::
439allGather(Span<const Type> send_buf,Span<Type> recv_buf)
441 _genericAllGather(ConstMemoryView{send_buf},MutableMemoryView{recv_buf});
447template<
class Type>
void SharedMemoryParallelDispatch<Type>::
448gather(Span<const Type> send_buf,Span<Type> recv_buf,
Int32 root_rank)
450 UniqueArray<Type> tmp_buf;
451 if (m_rank==root_rank)
452 allGather(send_buf,recv_buf);
454 tmp_buf.resize(send_buf.size() * m_nb_rank);
455 allGather(send_buf,tmp_buf);
462template<
class Type>
void SharedMemoryParallelDispatch<Type>::
463allGatherVariable(Span<const Type> send_buf,Array<Type>& recv_buf)
465 ResizableArrayRef recv_buf_ref(recv_buf);
466 _genericAllGatherVariable(ConstMemoryView(send_buf),&recv_buf_ref);
472template<
class Type>
void SharedMemoryParallelDispatch<Type>::
473gatherVariable(Span<const Type> send_buf,Array<Type>& recv_buf,
Int32 root_rank)
475 UniqueArray<Type> tmp_buf;
476 if (m_rank==root_rank)
477 allGatherVariable(send_buf,recv_buf);
479 allGatherVariable(send_buf,tmp_buf);
485template<
class Type>
void SharedMemoryParallelDispatch<Type>::
486scatterVariable(Span<const Type> send_buf,Span<Type> recv_buf,
Int32 root)
488 _genericScatterVariable(ConstMemoryView(send_buf),MutableMemoryView(recv_buf),root);
494template<
class Type>
void SharedMemoryParallelDispatch<Type>::
495allToAll(Span<const Type> send_buf,Span<Type> recv_buf,
Int32 count)
497 _genericAllToAll(ConstMemoryView(send_buf),MutableMemoryView(recv_buf),count);
503template<
class Type>
void SharedMemoryParallelDispatch<Type>::
504allToAllVariable(Span<const Type> send_buf, ConstArrayView<Int32> send_count,
505 ConstArrayView<Int32> send_index,
506 Span<Type> recv_buf, ConstArrayView<Int32> recv_count,
507 Int32ConstArrayView recv_index
510 _genericAllToAllVariable(ConstMemoryView(send_buf), send_count, send_index,
511 MutableMemoryView(recv_buf), recv_count, recv_index);
517template<
class Type>
auto SharedMemoryParallelDispatch<Type>::
518send(Span<const Type> send_buffer,
Int32 rank,
bool is_blocking) -> Request
520 auto block_mode = (is_blocking) ? Parallel::Blocking : Parallel::NonBlocking;
521 auto p2p_message = m_parallel_mng->buildMessage(rank,block_mode);
522 return send(send_buffer,p2p_message);
528template<
class Type>
void SharedMemoryParallelDispatch<Type>::
529send(ConstArrayView<Type> send_buf,
Int32 rank)
531 send(send_buf,rank,
true);
537template<
class Type> Parallel::Request SharedMemoryParallelDispatch<Type>::
538receive(Span<Type> recv_buffer,
Int32 rank,
bool is_blocking)
540 auto block_mode = (is_blocking) ? Parallel::Blocking : Parallel::NonBlocking;
541 auto p2p_message = m_parallel_mng->buildMessage(rank,block_mode);
542 return receive(recv_buffer,p2p_message);
548template<
class Type> Request SharedMemoryParallelDispatch<Type>::
549send(Span<const Type> send_buffer,
const PointToPointMessageInfo& message2)
551 return _genericSend(ConstMemoryView(send_buffer),message2);
557template<
class Type> Request SharedMemoryParallelDispatch<Type>::
558receive(Span<Type> recv_buffer,
const PointToPointMessageInfo& message2)
560 return _genericReceive(MutableMemoryView(recv_buffer),message2);
566template<
class Type>
void SharedMemoryParallelDispatch<Type>::
567recv(ArrayView<Type> recv_buffer,Integer rank)
569 recv(recv_buffer,rank,
true);
575template<
class Type>
void SharedMemoryParallelDispatch<Type>::
576sendRecv(ConstArrayView<Type> send_buffer,ArrayView<Type> recv_buffer,Integer proc)
578 ARCANE_UNUSED(send_buffer);
579 ARCANE_UNUSED(recv_buffer);
581 throw NotImplementedException(A_FUNCINFO);
587template<
class Type>
Type SharedMemoryParallelDispatch<Type>::
588allReduce(eReduceType op,
Type send_buf)
590 m_reduce_infos.reduce_value = send_buf;
593 _collectiveBarrier();
594 Type ret = m_all_dispatchs[0]->m_reduce_infos.reduce_value;
596 case Parallel::ReduceMin:
597 for( Integer i=1; i<m_nb_rank; ++i )
598 ret = math::min(ret,m_all_dispatchs[i]->m_reduce_infos.reduce_value);
600 case Parallel::ReduceMax:
601 for( Integer i=1; i<m_nb_rank; ++i )
602 ret = math::max(ret,m_all_dispatchs[i]->m_reduce_infos.reduce_value);
604 case Parallel::ReduceSum:
605 for( Integer i=1; i<m_nb_rank; ++i )
606 ret = (
Type)(ret + m_all_dispatchs[i]->m_reduce_infos.reduce_value);
612 _collectiveBarrier();
619template<
class Type>
void SharedMemoryParallelDispatch<Type>::
620_allReduceOrScan(eReduceType op, Span<Type> send_buf,
bool is_scan)
622 m_reduce_infos.reduce_buf = send_buf;
623 ++m_reduce_infos.m_index;
624 Int64 buf_size = send_buf.size();
625 UniqueArray<Type> ret(buf_size);
628 _collectiveBarrier();
630 Integer index0 = m_all_dispatchs[0]->m_reduce_infos.m_index;
631 for( Integer i=0; i<m_nb_rank; ++i ){
632 Integer indexi = m_all_dispatchs[i]->m_reduce_infos.m_index;
633 if (index0!=m_all_dispatchs[i]->m_reduce_infos.m_index){
634 ARCANE_FATAL(
"INTERNAL: incoherent all reduce i0={0} in={1} n={2}",
639 Int32 nb_rank = m_nb_rank;
641 nb_rank = m_rank + 1;
642 for( Integer j=0; j<buf_size; ++j )
643 ret[j] = m_all_dispatchs[0]->m_reduce_infos.reduce_buf[j];
645 case Parallel::ReduceMin:
646 for (Integer i = 1; i < nb_rank; ++i)
647 for( Integer j=0; j<buf_size; ++j )
648 ret[j] = math::min(ret[j],m_all_dispatchs[i]->m_reduce_infos.reduce_buf[j]);
650 case Parallel::ReduceMax:
651 for (Integer i = 1; i < nb_rank; ++i)
652 for( Integer j=0; j<buf_size; ++j )
653 ret[j] = math::max(ret[j],m_all_dispatchs[i]->m_reduce_infos.reduce_buf[j]);
655 case Parallel::ReduceSum:
656 for (Integer i = 1; i < nb_rank; ++i)
657 for( Integer j=0; j<buf_size; ++j )
658 ret[j] = (
Type)(ret[j] + m_all_dispatchs[i]->m_reduce_infos.reduce_buf[j]);
664 _collectiveBarrier();
665 for( Integer j=0; j<buf_size; ++j )
666 send_buf[j] = ret[j];
672template <
class Type>
void SharedMemoryParallelDispatch<Type>::
673allReduce(eReduceType op, Span<Type> send_buf)
675 _allReduceOrScan(op, send_buf,
false);
681template<
class Type> Request SharedMemoryParallelDispatch<Type>::
682nonBlockingAllReduce(eReduceType op,Span<const Type> send_buf,Span<Type> recv_buf)
685 ARCANE_UNUSED(send_buf);
686 ARCANE_UNUSED(recv_buf);
687 throw NotImplementedException(A_FUNCINFO);
693template<
class Type> Request SharedMemoryParallelDispatch<Type>::
694nonBlockingAllGather(Span<const Type> send_buf, Span<Type> recv_buf)
696 ARCANE_UNUSED(send_buf);
697 ARCANE_UNUSED(recv_buf);
698 throw NotImplementedException(A_FUNCINFO);
704template<
class Type> Request SharedMemoryParallelDispatch<Type>::
705nonBlockingBroadcast(Span<Type> send_buf,
Int32 rank)
707 ARCANE_UNUSED(send_buf);
709 throw NotImplementedException(A_FUNCINFO);
715template<
class Type> Request SharedMemoryParallelDispatch<Type>::
716nonBlockingGather(Span<const Type> send_buf, Span<Type> recv_buf,
Int32 rank)
718 ARCANE_UNUSED(send_buf);
719 ARCANE_UNUSED(recv_buf);
721 throw NotImplementedException(A_FUNCINFO);
727template<
class Type> Request SharedMemoryParallelDispatch<Type>::
728nonBlockingAllToAll(Span<const Type> send_buf, Span<Type> recv_buf,
Int32 count)
730 ARCANE_UNUSED(send_buf);
731 ARCANE_UNUSED(recv_buf);
732 ARCANE_UNUSED(count);
733 throw NotImplementedException(A_FUNCINFO);
739template<
class Type> Request SharedMemoryParallelDispatch<Type>::
740nonBlockingAllToAllVariable(Span<const Type> send_buf, ConstArrayView<Int32> send_count,
741 ConstArrayView<Int32> send_index, Span<Type> recv_buf,
742 ConstArrayView<Int32> recv_count, ConstArrayView<Int32> recv_index)
744 ARCANE_UNUSED(send_buf);
745 ARCANE_UNUSED(recv_buf);
746 ARCANE_UNUSED(send_count);
747 ARCANE_UNUSED(recv_count);
748 ARCANE_UNUSED(send_index);
749 ARCANE_UNUSED(recv_index);
750 throw NotImplementedException(A_FUNCINFO);
756template<
class Type>
Type SharedMemoryParallelDispatch<Type>::
757scan(eReduceType op,
Type send_buf)
760 ARCANE_UNUSED(send_buf);
761 throw NotImplementedException(A_FUNCINFO);
767template<
class Type>
void SharedMemoryParallelDispatch<Type>::
768scan(eReduceType op,ArrayView<Type> send_buf)
770 _allReduceOrScan(op, send_buf,
true);
776template<
class Type>
void SharedMemoryParallelDispatch<Type>::
780 throw NotImplementedException(A_FUNCINFO);
786template<
class Type> Request SharedMemoryParallelDispatch<Type>::
789 throw NotImplementedException(A_FUNCINFO);
795template class SharedMemoryParallelDispatch<char>;
796template class SharedMemoryParallelDispatch<signed char>;
797template class SharedMemoryParallelDispatch<unsigned char>;
798template class SharedMemoryParallelDispatch<short>;
799template class SharedMemoryParallelDispatch<unsigned short>;
800template class SharedMemoryParallelDispatch<int>;
801template class SharedMemoryParallelDispatch<unsigned int>;
802template class SharedMemoryParallelDispatch<long>;
803template class SharedMemoryParallelDispatch<unsigned long>;
804template class SharedMemoryParallelDispatch<long long>;
805template class SharedMemoryParallelDispatch<unsigned long long>;
806template class SharedMemoryParallelDispatch<float>;
807template class SharedMemoryParallelDispatch<double>;
808template class SharedMemoryParallelDispatch<long double>;
809template class SharedMemoryParallelDispatch<APReal>;
810template class SharedMemoryParallelDispatch<Real2>;
811template class SharedMemoryParallelDispatch<Real3>;
812template class SharedMemoryParallelDispatch<Real2x2>;
813template class SharedMemoryParallelDispatch<Real3x3>;
814template class SharedMemoryParallelDispatch<HPReal>;
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Informations pour un message 'gather' pour le type de données DataType.
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
UniqueArray< Int32 > Int32UniqueArray
Tableau dynamique à une dimension d'entiers 32 bits.
Int32 Integer
Type représentant un entier.