14#include "arcane/utils/ArcanePrecomp.h"
16#include "arcane/utils/Array.h"
17#include "arcane/utils/PlatformUtils.h"
18#include "arcane/utils/String.h"
19#include "arcane/utils/ITraceMng.h"
20#include "arcane/utils/NumericTypes.h"
21#include "arcane/utils/APReal.h"
22#include "arcane/utils/NotImplementedException.h"
23#include "arcane/utils/MemoryView.h"
26#include "arcane/core/MeshVariable.h"
27#include "arcane/core/IParallelMng.h"
28#include "arcane/core/ItemGroup.h"
29#include "arcane/core/IMesh.h"
30#include "arcane/core/IBase.h"
32#include "arcane/parallel/thread/SharedMemoryParallelDispatch.h"
33#include "arcane/parallel/thread/SharedMemoryParallelMng.h"
34#include "arcane/parallel/thread/ISharedMemoryMessageQueue.h"
36#include "arccore/message_passing/PointToPointMessageInfo.h"
68SharedMemoryParallelDispatchBase::
71 ArrayView<SharedMemoryParallelDispatchBase*> all_dispatchs_base)
73, m_parallel_mng(parallel_mng)
74, m_rank(parallel_mng->commRank())
75, m_nb_rank(parallel_mng->commSize())
76, m_message_queue(message_queue)
77, m_all_dispatchs_base(all_dispatchs_base)
84void SharedMemoryParallelDispatchBase::
87 m_parallel_mng->getThreadBarrier()->wait();
93void SharedMemoryParallelDispatchBase::
94_genericAllToAll(ConstMemoryView send_buf, MutableMemoryView recv_buf, Int32 count)
96 Int32 nb_rank = m_nb_rank;
104 for (Integer i = 0; i < nb_rank; ++i) {
105 send_indexes[i] = count * i;
106 recv_indexes[i] = count * i;
108 _genericAllToAllVariable(send_buf, send_count, send_indexes, recv_buf, recv_count, recv_indexes);
114void SharedMemoryParallelDispatchBase::
115_genericAllToAllVariable(ConstMemoryView send_buf,
116 Span<const Int32> send_count,
117 Span<const Int32> send_index,
118 MutableMemoryView recv_buf,
119 Span<const Int32> recv_count,
120 Span<const Int32> recv_index)
122 m_alltoallv_infos.send_buf = send_buf;
123 m_alltoallv_infos.send_count = send_count;
124 m_alltoallv_infos.send_index = send_index;
125 m_alltoallv_infos.recv_buf = recv_buf;
126 m_alltoallv_infos.recv_count = recv_count;
127 m_alltoallv_infos.recv_index = recv_index;
128 _collectiveBarrier();
130 Int32 my_rank = m_rank;
131 MutableMemoryView recv_mem_buf(recv_buf);
132 for (Integer i = 0; i < m_nb_rank; ++i) {
133 AllToAllVariableInfo ainfo = m_all_dispatchs_base[i]->m_alltoallv_infos;
134 ConstMemoryView view(ainfo.send_buf);
135 Integer index = ainfo.send_index[my_rank];
136 Integer count = ainfo.send_count[my_rank];
137 MemoryUtils::copyHost(recv_mem_buf.subView(global_index, count), view.subView(index, count));
138 global_index += count;
140 _collectiveBarrier();
146void SharedMemoryParallelDispatchBase::
147_genericAllGather(ConstMemoryView send_buf, MutableMemoryView recv_buf)
149 m_const_view = send_buf;
150 _collectiveBarrier();
151 MutableMemoryView recv_mem_view(recv_buf);
153 for (Int32 i = 0; i < m_nb_rank; ++i) {
154 ConstMemoryView view(m_all_dispatchs_base[i]->m_const_view);
155 Int64 size = view.nbElement();
156 MemoryUtils::copyHost(recv_mem_view.subView(index, size), view);
159 _collectiveBarrier();
165void SharedMemoryParallelDispatchBase::
166_genericAllGatherVariable(ConstMemoryView send_buf, IResizableArray* recv_buf)
168 m_const_view = send_buf;
169 _collectiveBarrier();
170 Int64 total_size = 0;
171 for (Integer i = 0; i < m_nb_rank; ++i) {
172 total_size += m_all_dispatchs_base[i]->m_const_view.nbElement();
174 recv_buf->resize(total_size);
175 MutableMemoryView recv_mem_view(recv_buf->memoryView());
177 for (Integer i = 0; i < m_nb_rank; ++i) {
178 ConstMemoryView view(m_all_dispatchs_base[i]->m_const_view);
179 Int64 size = view.nbElement();
180 MemoryUtils::copyHost(recv_mem_view.subView(index, size), view);
183 _collectiveBarrier();
189void SharedMemoryParallelDispatchBase::
190_genericScatterVariable(ConstMemoryView send_buf, MutableMemoryView recv_buf, Int32 root)
192 m_const_view = send_buf;
193 m_recv_view = recv_buf;
194 _collectiveBarrier();
195 if (m_rank == root) {
196 ConstMemoryView const_view(m_const_view);
198 for (Integer i = 0; i < m_nb_rank; ++i) {
199 MutableMemoryView view(m_all_dispatchs_base[i]->m_recv_view);
200 Int64 size = view.nbElement();
201 MemoryUtils::copyHost(view, const_view.subView(index, size));
205 _collectiveBarrier();
211Request SharedMemoryParallelDispatchBase::
212_genericSend(ConstMemoryView send_buffer,
const PointToPointMessageInfo& message2)
214 PointToPointMessageInfo message(message2);
215 message.setEmiterRank(MessageRank(m_rank));
216 bool is_blocking = message.isBlocking();
217 if (message.isRankTag()) {
218 Request r = m_message_queue->addSend(message, SendBufferInfo(send_buffer));
220 m_message_queue->waitAll(ArrayView<Request>(1, &r));
225 if (message.isMessageId()) {
227 ARCCORE_THROW(NotSupportedException,
"Invalid generic send with MessageId");
229 ARCCORE_THROW(NotSupportedException,
"Invalid message_info");
235Request SharedMemoryParallelDispatchBase::
236_genericReceive(MutableMemoryView recv_buffer,
const PointToPointMessageInfo& message2)
238 PointToPointMessageInfo message(message2);
239 bool is_blocking = message.isBlocking();
240 message.setEmiterRank(MessageRank(m_rank));
241 ReceiveBufferInfo buf{ recv_buffer };
242 Request r = m_message_queue->addReceive(message, buf);
244 m_message_queue->waitAll(ArrayView<Request>(1, &r));
245 return MP::Request();
253void SharedMemoryParallelDispatchBase::
254_genericBroadcast(MutableMemoryView send_buf, Int32 rank)
256 m_broadcast_view = send_buf;
257 _collectiveBarrier();
258 MemoryUtils::copyHost(m_broadcast_view, m_all_dispatchs_base[rank]->m_broadcast_view);
259 _collectiveBarrier();
268template <
class Type> SharedMemoryParallelDispatch<Type>::
269SharedMemoryParallelDispatch(ITraceMng* tm, SharedMemoryParallelMng* parallel_mng,
270 ISharedMemoryMessageQueue* message_queue,
271 impl::ShareMemoryDispatcherContainer<Type>& containers)
272: BaseClass(tm, parallel_mng, message_queue, containers.all_dispatchs_base)
273, m_all_dispatchs(containers.all_dispatchs)
275 m_reduce_infos.m_index = 0;
276 m_all_dispatchs[m_rank] =
this;
277 m_all_dispatchs_base[m_rank] =
this;
283template <
class Type> SharedMemoryParallelDispatch<Type>::
284~SharedMemoryParallelDispatch()
292template <
class Type>
void SharedMemoryParallelDispatch<Type>::
301class _ThreadIntegralType
305 typedef FalseType IsIntegral;
308#define ARCANE_DEFINE_INTEGRAL_TYPE(datatype) \
310 class _ThreadIntegralType<datatype> \
314 typedef TrueType IsIntegral; \
317ARCANE_DEFINE_INTEGRAL_TYPE(
long long);
318ARCANE_DEFINE_INTEGRAL_TYPE(
long);
319ARCANE_DEFINE_INTEGRAL_TYPE(
int);
320ARCANE_DEFINE_INTEGRAL_TYPE(
short);
321ARCANE_DEFINE_INTEGRAL_TYPE(
unsigned long long);
322ARCANE_DEFINE_INTEGRAL_TYPE(
unsigned long);
323ARCANE_DEFINE_INTEGRAL_TYPE(
unsigned int);
324ARCANE_DEFINE_INTEGRAL_TYPE(
unsigned short);
325ARCANE_DEFINE_INTEGRAL_TYPE(
double);
326ARCANE_DEFINE_INTEGRAL_TYPE(
float);
327ARCANE_DEFINE_INTEGRAL_TYPE(HPReal);
335 template <
class Type>
void
336 _computeMinMaxSum2(ArrayView<SharedMemoryParallelDispatch<Type>*> all_dispatchs,
338 Int32& min_rank, Int32& max_rank, Int32 nb_rank, FalseType)
340 ARCANE_UNUSED(all_dispatchs);
341 ARCANE_UNUSED(min_val);
342 ARCANE_UNUSED(max_val);
343 ARCANE_UNUSED(sum_val);
344 ARCANE_UNUSED(min_rank);
345 ARCANE_UNUSED(max_rank);
346 ARCANE_UNUSED(nb_rank);
348 throw NotImplementedException(A_FUNCINFO);
354 template <
class Type>
void
355 _computeMinMaxSum2(ArrayView<SharedMemoryParallelDispatch<Type>*> all_dispatchs,
357 Int32& min_rank, Int32& max_rank, Int32 nb_rank, TrueType)
359 Type _min_val = all_dispatchs[0]->m_reduce_infos.reduce_value;
360 Type _max_val = _min_val;
361 Type _sum_val = _min_val;
364 for (Integer i = 1; i < nb_rank; ++i) {
365 Type cval = all_dispatchs[i]->m_reduce_infos.reduce_value;
366 if (cval < _min_val) {
370 if (cval > _max_val) {
374 _sum_val = (
Type)(_sum_val + cval);
379 min_rank = _min_rank;
380 max_rank = _max_rank;
388template <
class Type>
void SharedMemoryParallelDispatch<Type>::
390 Int32& min_rank, Int32& max_rank)
392 typedef typename _ThreadIntegralType<Type>::IsIntegral IntegralType;
393 m_reduce_infos.reduce_value = val;
394 _collectiveBarrier();
395 _computeMinMaxSum2(m_all_dispatchs, min_val, max_val, sum_val, min_rank, max_rank, m_nb_rank, IntegralType());
396 _collectiveBarrier();
402template <
class Type>
void SharedMemoryParallelDispatch<Type>::
403computeMinMaxSum(ConstArrayView<Type> values,
404 ArrayView<Type> min_values,
405 ArrayView<Type> max_values,
406 ArrayView<Type> sum_values,
407 ArrayView<Int32> min_ranks,
408 ArrayView<Int32> max_ranks)
412 typedef typename _ThreadIntegralType<Type>::IsIntegral IntegralType;
414 for (Integer i = 0; i < n; ++i) {
415 m_reduce_infos.reduce_value = values[i];
416 _collectiveBarrier();
417 _computeMinMaxSum2(m_all_dispatchs, min_values[i], max_values[i], sum_values[i],
418 min_ranks[i], max_ranks[i], m_nb_rank, IntegralType());
419 _collectiveBarrier();
426template <
class Type>
void SharedMemoryParallelDispatch<Type>::
427broadcast(Span<Type> send_buf, Int32 rank)
429 _genericBroadcast(MutableMemoryView(send_buf), rank);
435template <
class Type>
void SharedMemoryParallelDispatch<Type>::
436allGather(Span<const Type> send_buf, Span<Type> recv_buf)
438 _genericAllGather(ConstMemoryView{ send_buf }, MutableMemoryView{ recv_buf });
444template <
class Type>
void SharedMemoryParallelDispatch<Type>::
445gather(Span<const Type> send_buf, Span<Type> recv_buf, Int32 root_rank)
447 UniqueArray<Type> tmp_buf;
448 if (m_rank == root_rank)
449 allGather(send_buf, recv_buf);
451 tmp_buf.resize(send_buf.size() * m_nb_rank);
452 allGather(send_buf, tmp_buf);
459template <
class Type>
void SharedMemoryParallelDispatch<Type>::
460allGatherVariable(Span<const Type> send_buf, Array<Type>& recv_buf)
462 ResizableArrayRef recv_buf_ref(recv_buf);
463 _genericAllGatherVariable(ConstMemoryView(send_buf), &recv_buf_ref);
469template <
class Type>
void SharedMemoryParallelDispatch<Type>::
470gatherVariable(Span<const Type> send_buf, Array<Type>& recv_buf, Int32 root_rank)
472 UniqueArray<Type> tmp_buf;
473 if (m_rank == root_rank)
474 allGatherVariable(send_buf, recv_buf);
476 allGatherVariable(send_buf, tmp_buf);
482template <
class Type>
void SharedMemoryParallelDispatch<Type>::
483scatterVariable(Span<const Type> send_buf, Span<Type> recv_buf, Int32 root)
485 _genericScatterVariable(ConstMemoryView(send_buf), MutableMemoryView(recv_buf), root);
491template <
class Type>
void SharedMemoryParallelDispatch<Type>::
492allToAll(Span<const Type> send_buf, Span<Type> recv_buf, Int32 count)
494 _genericAllToAll(ConstMemoryView(send_buf), MutableMemoryView(recv_buf), count);
500template <
class Type>
void SharedMemoryParallelDispatch<Type>::
501allToAllVariable(Span<const Type> send_buf, ConstArrayView<Int32> send_count,
502 ConstArrayView<Int32> send_index,
503 Span<Type> recv_buf, ConstArrayView<Int32> recv_count,
504 Int32ConstArrayView recv_index)
506 _genericAllToAllVariable(ConstMemoryView(send_buf), send_count, send_index,
507 MutableMemoryView(recv_buf), recv_count, recv_index);
513template <
class Type>
auto SharedMemoryParallelDispatch<Type>::
514send(Span<const Type> send_buffer, Int32 rank,
bool is_blocking) -> Request
516 auto block_mode = (is_blocking) ? Parallel::Blocking : Parallel::NonBlocking;
517 auto p2p_message = m_parallel_mng->buildMessage(rank, block_mode);
518 return send(send_buffer, p2p_message);
524template <
class Type>
void SharedMemoryParallelDispatch<Type>::
525send(ConstArrayView<Type> send_buf, Int32 rank)
527 send(send_buf, rank,
true);
533template <
class Type> Parallel::Request SharedMemoryParallelDispatch<Type>::
534receive(Span<Type> recv_buffer, Int32 rank,
bool is_blocking)
536 auto block_mode = (is_blocking) ? Parallel::Blocking : Parallel::NonBlocking;
537 auto p2p_message = m_parallel_mng->buildMessage(rank, block_mode);
538 return receive(recv_buffer, p2p_message);
544template <
class Type> Request SharedMemoryParallelDispatch<Type>::
545send(Span<const Type> send_buffer,
const PointToPointMessageInfo& message2)
547 return _genericSend(ConstMemoryView(send_buffer), message2);
553template <
class Type> Request SharedMemoryParallelDispatch<Type>::
554receive(Span<Type> recv_buffer,
const PointToPointMessageInfo& message2)
556 return _genericReceive(MutableMemoryView(recv_buffer), message2);
562template <
class Type>
void SharedMemoryParallelDispatch<Type>::
563recv(ArrayView<Type> recv_buffer, Integer rank)
565 recv(recv_buffer, rank,
true);
571template <
class Type>
void SharedMemoryParallelDispatch<Type>::
572sendRecv(ConstArrayView<Type> send_buffer, ArrayView<Type> recv_buffer, Integer proc)
574 ARCANE_UNUSED(send_buffer);
575 ARCANE_UNUSED(recv_buffer);
577 throw NotImplementedException(A_FUNCINFO);
583template <
class Type>
Type SharedMemoryParallelDispatch<Type>::
584allReduce(eReduceType op,
Type send_buf)
586 m_reduce_infos.reduce_value = send_buf;
589 _collectiveBarrier();
590 Type ret = m_all_dispatchs[0]->m_reduce_infos.reduce_value;
592 case Parallel::ReduceMin:
593 for (Integer i = 1; i < m_nb_rank; ++i)
594 ret = math::min(ret, m_all_dispatchs[i]->m_reduce_infos.reduce_value);
596 case Parallel::ReduceMax:
597 for (Integer i = 1; i < m_nb_rank; ++i)
598 ret = math::max(ret, m_all_dispatchs[i]->m_reduce_infos.reduce_value);
600 case Parallel::ReduceSum:
601 for (Integer i = 1; i < m_nb_rank; ++i)
602 ret = (
Type)(ret + m_all_dispatchs[i]->m_reduce_infos.reduce_value);
608 _collectiveBarrier();
615template <
class Type>
void SharedMemoryParallelDispatch<Type>::
616_allReduceOrScan(eReduceType op, Span<Type> send_buf,
bool is_scan)
618 m_reduce_infos.reduce_buf = send_buf;
619 ++m_reduce_infos.m_index;
620 Int64 buf_size = send_buf.size();
621 UniqueArray<Type> ret(buf_size);
624 _collectiveBarrier();
626 Integer index0 = m_all_dispatchs[0]->m_reduce_infos.m_index;
627 for (Integer i = 0; i < m_nb_rank; ++i) {
628 Integer indexi = m_all_dispatchs[i]->m_reduce_infos.m_index;
629 if (index0 != m_all_dispatchs[i]->m_reduce_infos.m_index) {
630 ARCANE_FATAL(
"INTERNAL: incoherent all reduce i0={0} in={1} n={2}",
635 Int32 nb_rank = m_nb_rank;
637 nb_rank = m_rank + 1;
638 for (Integer j = 0; j < buf_size; ++j)
639 ret[j] = m_all_dispatchs[0]->m_reduce_infos.reduce_buf[j];
641 case Parallel::ReduceMin:
642 for (Integer i = 1; i < nb_rank; ++i)
643 for (Integer j = 0; j < buf_size; ++j)
644 ret[j] = math::min(ret[j], m_all_dispatchs[i]->m_reduce_infos.reduce_buf[j]);
646 case Parallel::ReduceMax:
647 for (Integer i = 1; i < nb_rank; ++i)
648 for (Integer j = 0; j < buf_size; ++j)
649 ret[j] = math::max(ret[j], m_all_dispatchs[i]->m_reduce_infos.reduce_buf[j]);
651 case Parallel::ReduceSum:
652 for (Integer i = 1; i < nb_rank; ++i)
653 for (Integer j = 0; j < buf_size; ++j)
654 ret[j] = (
Type)(ret[j] + m_all_dispatchs[i]->m_reduce_infos.reduce_buf[j]);
660 _collectiveBarrier();
661 for (Integer j = 0; j < buf_size; ++j)
662 send_buf[j] = ret[j];
668template <
class Type>
void SharedMemoryParallelDispatch<Type>::
669allReduce(eReduceType op, Span<Type> send_buf)
671 _allReduceOrScan(op, send_buf,
false);
677template <
class Type> Request SharedMemoryParallelDispatch<Type>::
678nonBlockingAllReduce(eReduceType op, Span<const Type> send_buf, Span<Type> recv_buf)
681 ARCANE_UNUSED(send_buf);
682 ARCANE_UNUSED(recv_buf);
683 throw NotImplementedException(A_FUNCINFO);
689template <
class Type> Request SharedMemoryParallelDispatch<Type>::
690nonBlockingAllGather(Span<const Type> send_buf, Span<Type> recv_buf)
692 ARCANE_UNUSED(send_buf);
693 ARCANE_UNUSED(recv_buf);
694 throw NotImplementedException(A_FUNCINFO);
700template <
class Type> Request SharedMemoryParallelDispatch<Type>::
701nonBlockingBroadcast(Span<Type> send_buf, Int32 rank)
703 ARCANE_UNUSED(send_buf);
705 throw NotImplementedException(A_FUNCINFO);
711template <
class Type> Request SharedMemoryParallelDispatch<Type>::
712nonBlockingGather(Span<const Type> send_buf, Span<Type> recv_buf, Int32 rank)
714 ARCANE_UNUSED(send_buf);
715 ARCANE_UNUSED(recv_buf);
717 throw NotImplementedException(A_FUNCINFO);
723template <
class Type> Request SharedMemoryParallelDispatch<Type>::
724nonBlockingAllToAll(Span<const Type> send_buf, Span<Type> recv_buf, Int32 count)
726 ARCANE_UNUSED(send_buf);
727 ARCANE_UNUSED(recv_buf);
728 ARCANE_UNUSED(count);
729 throw NotImplementedException(A_FUNCINFO);
735template <
class Type> Request SharedMemoryParallelDispatch<Type>::
736nonBlockingAllToAllVariable(Span<const Type> send_buf, ConstArrayView<Int32> send_count,
737 ConstArrayView<Int32> send_index, Span<Type> recv_buf,
738 ConstArrayView<Int32> recv_count, ConstArrayView<Int32> recv_index)
740 ARCANE_UNUSED(send_buf);
741 ARCANE_UNUSED(recv_buf);
742 ARCANE_UNUSED(send_count);
743 ARCANE_UNUSED(recv_count);
744 ARCANE_UNUSED(send_index);
745 ARCANE_UNUSED(recv_index);
746 throw NotImplementedException(A_FUNCINFO);
752template <
class Type>
Type SharedMemoryParallelDispatch<Type>::
753scan(eReduceType op,
Type send_buf)
756 ARCANE_UNUSED(send_buf);
757 throw NotImplementedException(A_FUNCINFO);
763template <
class Type>
void SharedMemoryParallelDispatch<Type>::
764scan(eReduceType op, ArrayView<Type> send_buf)
766 _allReduceOrScan(op, send_buf,
true);
772template <
class Type>
void SharedMemoryParallelDispatch<Type>::
776 throw NotImplementedException(A_FUNCINFO);
782template <
class Type> Request SharedMemoryParallelDispatch<Type>::
785 throw NotImplementedException(A_FUNCINFO);
791template class SharedMemoryParallelDispatch<char>;
792template class SharedMemoryParallelDispatch<signed char>;
793template class SharedMemoryParallelDispatch<unsigned char>;
794template class SharedMemoryParallelDispatch<short>;
795template class SharedMemoryParallelDispatch<unsigned short>;
796template class SharedMemoryParallelDispatch<int>;
797template class SharedMemoryParallelDispatch<unsigned int>;
798template class SharedMemoryParallelDispatch<long>;
799template class SharedMemoryParallelDispatch<unsigned long>;
800template class SharedMemoryParallelDispatch<long long>;
801template class SharedMemoryParallelDispatch<unsigned long long>;
802template class SharedMemoryParallelDispatch<float>;
803template class SharedMemoryParallelDispatch<double>;
804template class SharedMemoryParallelDispatch<long double>;
805template class SharedMemoryParallelDispatch<APReal>;
806template class SharedMemoryParallelDispatch<Real2>;
807template class SharedMemoryParallelDispatch<Real3>;
808template class SharedMemoryParallelDispatch<Real2x2>;
809template class SharedMemoryParallelDispatch<Real3x3>;
810template class SharedMemoryParallelDispatch<HPReal>;
#define ARCANE_FATAL(...)
Macro throwing a FatalErrorException.
#define ARCCORE_THROW(exception_class,...)
Macro to throw an exception with formatting.
Memory and allocator management functions.
Brief information for a 'gather' message for data type DataType.
Interface of a message queue with threads.
Thread-based parallelism manager.
Declarations of types and methods used by message exchange mechanisms.
Int32 Integer
Type representing an integer.
UniqueArray< Int32 > Int32UniqueArray
Dynamic 1D array of 32-bit integers.