16#include "arccore/message_passing_mpi/internal/MpiMultiMachineShMemWinBaseInternal.h"
18#include "arccore/base/FatalErrorException.h"
23namespace Arcane::MessagePassing::Mpi
35, m_comm_machine(comm_machine)
36, m_comm_machine_size(comm_machine_size)
37, m_comm_machine_rank(comm_machine_rank)
38, m_sizeof_type(sizeof_type)
39, m_nb_segments_per_proc(nb_segments_per_proc)
40, m_machine_ranks(machine_ranks)
44 if (m_sizeof_type <= 0) {
47 for (
Integer i = 0; i < m_nb_segments_per_proc; ++i) {
48 if (sizeof_segments[i] < 0 || sizeof_segments[i] % m_sizeof_type != 0) {
52 if (m_nb_segments_per_proc <= 0) {
55 m_all_mpi_win.resize(m_comm_machine_size * m_nb_segments_per_proc);
58 for (
Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
63 MPI_Info win_info_true;
64 MPI_Info_create(&win_info_true);
65 MPI_Info_set(win_info_true,
"alloc_shared_noncontig",
"true");
67 MPI_Info win_info_false;
68 MPI_Info_create(&win_info_false);
69 MPI_Info_set(win_info_false,
"alloc_shared_noncontig",
"false");
71 const Int32 pos_my_wins = m_comm_machine_rank * m_nb_segments_per_proc;
75 for (
Integer i = 0; i < m_comm_machine_size; ++i) {
76 for (
Integer j = 0; j < m_nb_segments_per_proc; ++j) {
78 if (m_comm_machine_rank == i) {
79 if (sizeof_segments[j] == 0)
80 size_seg = m_sizeof_type;
82 size_seg = sizeof_segments[j];
84 std::byte* ptr_seg =
nullptr;
85 int error = MPI_Win_allocate_shared(size_seg, m_sizeof_type, win_info_true, m_comm_machine, &ptr_seg, &
m_all_mpi_win[j + i * m_nb_segments_per_proc]);
87 if (error != MPI_SUCCESS) {
93 for (
Integer i = 0; i < m_nb_segments_per_proc; ++i) {
96 std::byte* ptr_seg =
nullptr;
97 int error = MPI_Win_shared_query(
m_all_mpi_win[i + pos_my_wins], m_comm_machine_rank, &size_seg, &
size_type, &ptr_seg);
99 if (error != MPI_SUCCESS) {
111 Int64* ptr_seg =
nullptr;
112 Int64* ptr_win =
nullptr;
114 int error = MPI_Win_allocate_shared(
static_cast<Int64>(
sizeof(
Int64)) * m_nb_segments_per_proc,
sizeof(
Int64), win_info_false, m_comm_machine, &ptr_seg, &
m_win_need_resize);
116 if (error != MPI_SUCCESS) {
125 if (error != MPI_SUCCESS) {
131 for (
Integer i = 0; i < m_nb_segments_per_proc; ++i) {
135 if (ptr_win + pos_my_wins != ptr_seg) {
141 Int64* ptr_seg =
nullptr;
142 Int64* ptr_win =
nullptr;
144 int error = MPI_Win_allocate_shared(
static_cast<Int64>(
sizeof(
Int64)) * m_nb_segments_per_proc,
sizeof(
Int64), win_info_false, m_comm_machine, &ptr_seg, &
m_win_actual_sizeof);
146 if (error != MPI_SUCCESS) {
155 if (error != MPI_SUCCESS) {
161 for (
Integer i = 0; i < m_nb_segments_per_proc; ++i) {
165 if (ptr_win + pos_my_wins != ptr_seg) {
171 Int32* ptr_seg =
nullptr;
172 Int32* ptr_win =
nullptr;
174 int error = MPI_Win_allocate_shared(
static_cast<Int64>(
sizeof(
Int32)) * m_nb_segments_per_proc,
sizeof(
Int32), win_info_false, m_comm_machine, &ptr_seg, &
m_win_target_segments);
176 if (error != MPI_SUCCESS) {
185 if (error != MPI_SUCCESS) {
191 for (
Integer i = 0; i < m_nb_segments_per_proc; ++i) {
195 if (ptr_win + pos_my_wins != ptr_seg) {
200 MPI_Info_free(&win_info_false);
201 MPI_Info_free(&win_info_true);
203 MPI_Barrier(m_comm_machine);
209MpiMultiMachineShMemWinBaseInternal::
210~MpiMultiMachineShMemWinBaseInternal()
212 for (
Integer i = 0; i < m_comm_machine_size * m_nb_segments_per_proc; ++i) {
226 return m_sizeof_type;
235 return m_machine_ranks;
244 MPI_Barrier(m_comm_machine);
253 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
263 const Int32 segment_infos_pos = num_seg + _worldToMachine(rank) * m_nb_segments_per_proc;
267 std::byte* ptr_seg =
nullptr;
268 int error = MPI_Win_shared_query(
m_all_mpi_win[segment_infos_pos], rank, &size_seg, &
size_type, &ptr_seg);
270 if (error != MPI_SUCCESS) {
283 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
293 const Int32 segment_infos_pos = num_seg + _worldToMachine(rank) * m_nb_segments_per_proc;
297 std::byte* ptr_seg =
nullptr;
298 int error = MPI_Win_shared_query(
m_all_mpi_win[segment_infos_pos], rank, &size_seg, &
size_type, &ptr_seg);
300 if (error != MPI_SUCCESS) {
313 if (elem.
size() % m_sizeof_type) {
316 if (elem.
empty() || elem.
data() ==
nullptr) {
320 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
323 const Int64 future_sizeof_win = actual_sizeof_win + elem.
size();
326 if (future_sizeof_win > old_reserved) {
334 m_add_requested =
true;
345 if (!m_add_requested) {
348 m_add_requested =
false;
350 for (
Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
355 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
364 for (
Int64 pos_win = actual_sizeof_win, pos_elem = 0; pos_win < future_sizeof_win; ++pos_win, ++pos_elem) {
379 if (elem.
size() % m_sizeof_type) {
382 if (elem.
empty() || elem.
data() ==
nullptr) {
386 const Int32 machine_rank = _worldToMachine(rank);
387 const Int32 target_segment_infos_pos = num_seg + machine_rank * m_nb_segments_per_proc;
390 const Int32 segment_infos_pos = thread + m_comm_machine_rank * m_nb_segments_per_proc;
397 std::byte* ptr_seg =
nullptr;
399 int error = MPI_Win_shared_query(
m_all_mpi_win[target_segment_infos_pos], machine_rank, &size_seg, &
size_type, &ptr_seg);
401 if (error != MPI_SUCCESS) {
408 const Int64 future_sizeof_win = actual_sizeof_win + elem.
size();
409 const Int64 old_reserved = rank_reserved_part_span.
size();
411 if (future_sizeof_win > old_reserved) {
419 m_add_requested =
true;
428 MPI_Barrier(m_comm_machine);
433 auto is_my_seg_edited = std::make_unique<bool[]>(m_comm_machine_size);
434 for (
Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
436 if (rank_asked == m_comm_machine_rank) {
437 is_my_seg_edited[num_seg] =
true;
445 if (!m_add_requested) {
450 m_add_requested =
false;
453 for (
Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
454 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
456 if (seg_needs_to_edit == -1)
459 bool is_found =
false;
461 if (rank_asked == seg_needs_to_edit) {
466 ARCCORE_FATAL(
"Two subdomains ask same rank for addToAnotherSegment()");
475 for (
Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
480 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
482 if (target_segment_infos_pos == -1) {
493 std::byte* ptr_seg =
nullptr;
495 int error = MPI_Win_shared_query(
m_all_mpi_win[target_segment_infos_pos], target_segment_infos_pos / m_nb_segments_per_proc, &size_seg, &
size_type, &ptr_seg);
497 if (error != MPI_SUCCESS) {
503 if (rank_reserved_part_span.
size() < future_sizeof_win) {
504 ARCCORE_FATAL(
"Bad realloc -- New size : {1} -- Needed size : {2}", rank_reserved_part_span.
size(), future_sizeof_win);
508 for (
Int64 pos_win = actual_sizeof_win, pos_elem = 0; pos_win < future_sizeof_win; ++pos_win, ++pos_elem) {
509 rank_reserved_part_span[pos_win] =
m_add_requests[num_seg][pos_elem];
517 MPI_Barrier(m_comm_machine);
520 for (
Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
521 if (is_my_seg_edited[num_seg]) {
522 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
525 std::byte* ptr_seg =
nullptr;
527 int error = MPI_Win_shared_query(
m_all_mpi_win[segment_infos_pos], m_comm_machine_rank, &size_seg, &
size_type, &ptr_seg);
529 if (error != MPI_SUCCESS) {
543 if (new_capacity % m_sizeof_type) {
547 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
572 if (new_size == -1) {
575 if (new_size < 0 || new_size % m_sizeof_type) {
579 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
589 m_resize_requested =
true;
600 if (!m_resize_requested) {
603 m_resize_requested =
false;
605 for (
Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
610 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
627 for (
Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
628 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
661void MpiMultiMachineShMemWinBaseInternal::
666 MPI_Barrier(m_comm_machine);
672 for (
Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
673 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
682 MPI_Barrier(m_comm_machine);
688void MpiMultiMachineShMemWinBaseInternal::
692 MPI_Info_create(&win_info);
693 MPI_Info_set(win_info,
"alloc_shared_noncontig",
"true");
696 for (
Integer rank = 0; rank < m_comm_machine_size; ++rank) {
697 for (
Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
699 const Int32 local_segment_infos_pos = num_seg + rank * m_nb_segments_per_proc;
704 ARCCORE_ASSERT(
m_need_resize[local_segment_infos_pos] >= 0, (
"New size must be >= 0"));
705 ARCCORE_ASSERT(
m_need_resize[local_segment_infos_pos] % m_sizeof_type == 0, (
"New size must be % sizeof type"));
709 const Int64 size_seg = (m_comm_machine_rank == rank ? (
m_need_resize[local_segment_infos_pos] == 0 ? m_sizeof_type :
m_need_resize[local_segment_infos_pos]) : 0);
713 std::byte* ptr_new_seg =
nullptr;
716 int error = MPI_Win_allocate_shared(size_seg, m_sizeof_type, win_info, m_comm_machine, &ptr_new_seg, &
m_all_mpi_win[local_segment_infos_pos]);
717 if (error != MPI_SUCCESS) {
718 MPI_Win_free(&old_win);
723 if (m_comm_machine_rank == rank) {
729 std::byte* ptr_old_seg =
nullptr;
730 MPI_Aint mpi_reserved_size_new_seg;
734 MPI_Aint size_old_seg;
738 error = MPI_Win_shared_query(old_win, m_comm_machine_rank, &size_old_seg, &size_type, &ptr_old_seg);
739 if (error != MPI_SUCCESS || ptr_old_seg ==
nullptr) {
740 MPI_Win_free(&old_win);
747 std::byte* ptr_seg =
nullptr;
751 error = MPI_Win_shared_query(
m_all_mpi_win[local_segment_infos_pos], m_comm_machine_rank, &mpi_reserved_size_new_seg, &size_type, &ptr_seg);
752 if (error != MPI_SUCCESS || ptr_seg ==
nullptr || ptr_seg != ptr_new_seg) {
753 MPI_Win_free(&old_win);
762 memcpy(ptr_new_seg, ptr_old_seg, min_size);
765 MPI_Win_free(&old_win);
768 MPI_Info_free(&win_info);
771 for (
Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
772 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
776 std::byte* ptr_seg =
nullptr;
777 int error = MPI_Win_shared_query(
m_all_mpi_win[segment_infos_pos], m_comm_machine_rank, &size_seg, &size_type, &ptr_seg);
779 if (error != MPI_SUCCESS) {
790Int32 MpiMultiMachineShMemWinBaseInternal::
791_worldToMachine(
Int32 world)
const
793 for (
Int32 i = 0; i < m_comm_machine_size; ++i) {
794 if (m_machine_ranks[i] == world) {
804Int32 MpiMultiMachineShMemWinBaseInternal::
805_machineToWorld(
Int32 machine)
const
807 return m_machine_ranks[machine];
#define ARCCORE_FATAL(...)
Macro throwing a FatalErrorException.
Constant view of an array of type T.
void requestAdd(Int32 num_seg, Span< const std::byte > elem)
Method to request the addition of elements into one of our segments.
Span< const std::byte > segmentConstView(Int32 num_seg) const
Method to get a constant view of one of our segments.
void executeResize()
Method to execute the resizing requests.
void executeShrink()
Method to reduce the reserved memory space for the segments to the minimum necessary.
void requestReserve(Int32 num_seg, Int64 new_capacity)
Method to request the reservation of memory space for one of our segments.
void requestAddToAnotherSegment(Int32 thread, Int32 rank, Int32 num_seg, Span< const std::byte > elem)
Method to request the addition of elements into one of the segments of the window.
void executeReserve()
Method to execute the reservation requests.
void _requestRealloc(Int32 owner_pos_segment, Int64 new_capacity) const
Method to request a reallocation.
MpiMultiMachineShMemWinBaseInternal(SmallSpan< Int64 > sizeof_segments, Int32 nb_segments_per_proc, Int32 sizeof_type, const MPI_Comm &comm_machine, Int32 comm_machine_rank, Int32 comm_machine_size, ConstArrayView< Int32 > machine_ranks)
sizeof_segments should not be preserved!
void executeAdd()
Method to execute the addition requests.
void requestResize(Int32 num_seg, Int64 new_size)
Method to request the resizing of one of our segments.
Span< Int32 > m_target_segments
void barrier() const
Method to wait until all processes on the node call this method to continue execution.
void executeAddToAnotherSegment()
Method to execute the addition requests in the segments of other processes.
MPI_Win m_win_actual_sizeof
Contiguous window with main window sizes.
UniqueArray< MPI_Win > m_all_mpi_win
Span< Int64 > m_need_resize
UniqueArray< Span< std::byte > > m_reserved_part_span
UniqueArray< Span< const std::byte > > m_add_requests
Int32 sizeofOneElem() const
Method to get the size of an element in the window.
UniqueArray< Int64 > m_resize_requests
Span< Int64 > m_sizeof_used_part
MPI_Win m_win_need_resize
Span< std::byte > segmentView(Int32 num_seg)
Method to get a view of one of our segments.
MPI_Win m_win_target_segments
ConstArrayView< Int32 > machineRanks() const
Method to get the ranks that own a segment in the window.
View of an array of elements of type T.
constexpr __host__ __device__ pointer data() const noexcept
Pointer to the start of the view.
constexpr __host__ __device__ bool empty() const noexcept
Returns true if the array is empty (zero dimension).
constexpr __host__ __device__ SizeType size() const noexcept
Returns the size of the array.
View of an array of elements of type T.
std::int64_t Int64
Signed integer type of 64 bits.
Int32 Integer
Type representing an integer.
std::int32_t Int32
Signed integer type of 32 bits.