15#include "arccore/message_passing_mpi/internal/MpiMultiMachineShMemWinBaseInternal.h"
17#include "arccore/base/FatalErrorException.h"
22namespace Arcane::MessagePassing::Mpi
29MpiMultiMachineShMemWinBaseInternal::
30MpiMultiMachineShMemWinBaseInternal(SmallSpan<Int64> sizeof_segments,
Int32 nb_segments_per_proc,
Int32 sizeof_type,
const MPI_Comm& comm_machine,
Int32 comm_machine_rank,
Int32 comm_machine_size, ConstArrayView<Int32> machine_ranks)
32, m_win_actual_sizeof()
33, m_win_target_segments()
34, m_comm_machine(comm_machine)
35, m_comm_machine_size(comm_machine_size)
36, m_comm_machine_rank(comm_machine_rank)
37, m_sizeof_type(sizeof_type)
38, m_nb_segments_per_proc(nb_segments_per_proc)
39, m_machine_ranks(machine_ranks)
40, m_add_requests(nb_segments_per_proc)
41, m_resize_requests(nb_segments_per_proc)
43 if (m_sizeof_type <= 0) {
46 for (Integer i = 0; i < m_nb_segments_per_proc; ++i) {
47 if (sizeof_segments[i] < 0 || sizeof_segments[i] % m_sizeof_type != 0) {
51 if (m_nb_segments_per_proc <= 0) {
54 m_all_mpi_win.resize(m_comm_machine_size * m_nb_segments_per_proc);
55 m_reserved_part_span.resize(m_nb_segments_per_proc);
57 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
58 m_add_requests[num_seg] = Span<const std::byte>{
nullptr, 0 };
59 m_resize_requests[num_seg] = -1;
62 MPI_Info win_info_true;
63 MPI_Info_create(&win_info_true);
64 MPI_Info_set(win_info_true,
"alloc_shared_noncontig",
"true");
66 MPI_Info win_info_false;
67 MPI_Info_create(&win_info_false);
68 MPI_Info_set(win_info_false,
"alloc_shared_noncontig",
"false");
70 const Int32 pos_my_wins = m_comm_machine_rank * m_nb_segments_per_proc;
74 for (Integer i = 0; i < m_comm_machine_size; ++i) {
75 for (Integer j = 0; j < m_nb_segments_per_proc; ++j) {
77 if (m_comm_machine_rank == i) {
78 if (sizeof_segments[j] == 0)
79 size_seg = m_sizeof_type;
81 size_seg = sizeof_segments[j];
83 std::byte* ptr_seg =
nullptr;
84 int error = MPI_Win_allocate_shared(size_seg, m_sizeof_type, win_info_true, m_comm_machine, &ptr_seg, &m_all_mpi_win[j + i * m_nb_segments_per_proc]);
86 if (error != MPI_SUCCESS) {
92 for (Integer i = 0; i < m_nb_segments_per_proc; ++i) {
95 std::byte* ptr_seg =
nullptr;
96 int error = MPI_Win_shared_query(m_all_mpi_win[i + pos_my_wins], m_comm_machine_rank, &size_seg, &size_type, &ptr_seg);
98 if (error != MPI_SUCCESS) {
105 m_reserved_part_span[i] = Span<std::byte>{ ptr_seg, size_seg };
110 Int64* ptr_seg =
nullptr;
111 Int64* ptr_win =
nullptr;
113 int error = MPI_Win_allocate_shared(
static_cast<Int64
>(
sizeof(Int64)) * m_nb_segments_per_proc,
sizeof(Int64), win_info_false, m_comm_machine, &ptr_seg, &m_win_need_resize);
115 if (error != MPI_SUCCESS) {
122 int error = MPI_Win_shared_query(m_win_need_resize, 0, &size_seg, &size_type, &ptr_win);
124 if (error != MPI_SUCCESS) {
128 m_need_resize = Span<Int64>{ ptr_win, m_comm_machine_size * m_nb_segments_per_proc };
130 for (Integer i = 0; i < m_nb_segments_per_proc; ++i) {
131 m_need_resize[i + pos_my_wins] = -1;
134 if (ptr_win + pos_my_wins != ptr_seg) {
140 Int64* ptr_seg =
nullptr;
141 Int64* ptr_win =
nullptr;
143 int error = MPI_Win_allocate_shared(
static_cast<Int64
>(
sizeof(Int64)) * m_nb_segments_per_proc,
sizeof(Int64), win_info_false, m_comm_machine, &ptr_seg, &m_win_actual_sizeof);
145 if (error != MPI_SUCCESS) {
152 int error = MPI_Win_shared_query(m_win_actual_sizeof, 0, &size_seg, &size_type, &ptr_win);
154 if (error != MPI_SUCCESS) {
158 m_sizeof_used_part = Span<Int64>{ ptr_win, m_comm_machine_size * m_nb_segments_per_proc };
160 for (Integer i = 0; i < m_nb_segments_per_proc; ++i) {
161 m_sizeof_used_part[i + pos_my_wins] = sizeof_segments[i];
164 if (ptr_win + pos_my_wins != ptr_seg) {
170 Int32* ptr_seg =
nullptr;
171 Int32* ptr_win =
nullptr;
173 int error = MPI_Win_allocate_shared(
static_cast<Int64
>(
sizeof(Int32)) * m_nb_segments_per_proc,
sizeof(Int32), win_info_false, m_comm_machine, &ptr_seg, &m_win_target_segments);
175 if (error != MPI_SUCCESS) {
182 int error = MPI_Win_shared_query(m_win_target_segments, 0, &size_seg, &size_type, &ptr_win);
184 if (error != MPI_SUCCESS) {
188 m_target_segments = Span<Int32>{ ptr_win, m_comm_machine_size * m_nb_segments_per_proc };
190 for (Integer i = 0; i < m_nb_segments_per_proc; ++i) {
191 m_target_segments[i + pos_my_wins] = -1;
194 if (ptr_win + pos_my_wins != ptr_seg) {
199 MPI_Info_free(&win_info_false);
200 MPI_Info_free(&win_info_true);
202 MPI_Barrier(m_comm_machine);
208MpiMultiMachineShMemWinBaseInternal::
209~MpiMultiMachineShMemWinBaseInternal()
211 for (Integer i = 0; i < m_comm_machine_size * m_nb_segments_per_proc; ++i) {
212 MPI_Win_free(&m_all_mpi_win[i]);
214 MPI_Win_free(&m_win_need_resize);
215 MPI_Win_free(&m_win_actual_sizeof);
216 MPI_Win_free(&m_win_target_segments);
222Int32 MpiMultiMachineShMemWinBaseInternal::
225 return m_sizeof_type;
231ConstArrayView<Int32> MpiMultiMachineShMemWinBaseInternal::
234 return m_machine_ranks;
240void MpiMultiMachineShMemWinBaseInternal::
243 MPI_Barrier(m_comm_machine);
249Span<std::byte> MpiMultiMachineShMemWinBaseInternal::
250segmentView(Int32 num_seg)
252 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
253 return m_reserved_part_span[num_seg].subSpan(0, m_sizeof_used_part[segment_infos_pos]);
259Span<std::byte> MpiMultiMachineShMemWinBaseInternal::
260segmentView(Int32 rank, Int32 num_seg)
262 const Int32 segment_infos_pos = num_seg + _worldToMachine(rank) * m_nb_segments_per_proc;
266 std::byte* ptr_seg =
nullptr;
267 int error = MPI_Win_shared_query(m_all_mpi_win[segment_infos_pos], rank, &size_seg, &size_type, &ptr_seg);
269 if (error != MPI_SUCCESS) {
273 return Span<std::byte>{ ptr_seg, m_sizeof_used_part[segment_infos_pos] };
279Span<const std::byte> MpiMultiMachineShMemWinBaseInternal::
280segmentConstView(Int32 num_seg)
const
282 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
283 return m_reserved_part_span[num_seg].subSpan(0, m_sizeof_used_part[segment_infos_pos]);
289Span<const std::byte> MpiMultiMachineShMemWinBaseInternal::
290segmentConstView(Int32 rank, Int32 num_seg)
const
292 const Int32 segment_infos_pos = num_seg + _worldToMachine(rank) * m_nb_segments_per_proc;
296 std::byte* ptr_seg =
nullptr;
297 int error = MPI_Win_shared_query(m_all_mpi_win[segment_infos_pos], rank, &size_seg, &size_type, &ptr_seg);
299 if (error != MPI_SUCCESS) {
303 return Span<const std::byte>{ ptr_seg, m_sizeof_used_part[segment_infos_pos] };
309void MpiMultiMachineShMemWinBaseInternal::
310requestAdd(Int32 num_seg, Span<const std::byte> elem)
312 if (elem.size() % m_sizeof_type) {
315 if (elem.empty() || elem.data() ==
nullptr) {
319 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
321 const Int64 actual_sizeof_win = m_sizeof_used_part[segment_infos_pos];
322 const Int64 future_sizeof_win = actual_sizeof_win + elem.size();
323 const Int64 old_reserved = m_reserved_part_span[num_seg].size();
325 if (future_sizeof_win > old_reserved) {
326 _requestRealloc(segment_infos_pos, future_sizeof_win);
329 _requestRealloc(segment_infos_pos);
332 m_add_requests[num_seg] = elem;
333 m_add_requested =
true;
339void MpiMultiMachineShMemWinBaseInternal::
344 if (!m_add_requested) {
347 m_add_requested =
false;
349 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
350 if (m_add_requests[num_seg].empty() || m_add_requests[num_seg].data() ==
nullptr) {
354 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
356 const Int64 actual_sizeof_win = m_sizeof_used_part[segment_infos_pos];
357 const Int64 future_sizeof_win = actual_sizeof_win + m_add_requests[num_seg].size();
359 if (m_reserved_part_span[num_seg].size() < future_sizeof_win) {
360 ARCCORE_FATAL(
"Bad realloc -- New size : {1} -- Needed size : {2}", m_reserved_part_span[num_seg].size(), future_sizeof_win);
363 for (Int64 pos_win = actual_sizeof_win, pos_elem = 0; pos_win < future_sizeof_win; ++pos_win, ++pos_elem) {
364 m_reserved_part_span[num_seg][pos_win] = m_add_requests[num_seg][pos_elem];
366 m_sizeof_used_part[segment_infos_pos] = future_sizeof_win;
368 m_add_requests[num_seg] = Span<const std::byte>{
nullptr, 0 };
375void MpiMultiMachineShMemWinBaseInternal::
376requestAddToAnotherSegment(Int32 thread, Int32 rank, Int32 num_seg, Span<const std::byte> elem)
378 if (elem.size() % m_sizeof_type) {
381 if (elem.empty() || elem.data() ==
nullptr) {
385 const Int32 machine_rank = _worldToMachine(rank);
386 const Int32 target_segment_infos_pos = num_seg + machine_rank * m_nb_segments_per_proc;
389 const Int32 segment_infos_pos = thread + m_comm_machine_rank * m_nb_segments_per_proc;
390 m_target_segments[segment_infos_pos] = target_segment_infos_pos;
393 Span<std::byte> rank_reserved_part_span;
396 std::byte* ptr_seg =
nullptr;
398 int error = MPI_Win_shared_query(m_all_mpi_win[target_segment_infos_pos], machine_rank, &size_seg, &size_type, &ptr_seg);
400 if (error != MPI_SUCCESS) {
403 rank_reserved_part_span = Span<std::byte>{ ptr_seg, size_seg };
406 const Int64 actual_sizeof_win = m_sizeof_used_part[target_segment_infos_pos];
407 const Int64 future_sizeof_win = actual_sizeof_win + elem.size();
408 const Int64 old_reserved = rank_reserved_part_span.size();
410 if (future_sizeof_win > old_reserved) {
411 _requestRealloc(target_segment_infos_pos, future_sizeof_win);
414 _requestRealloc(target_segment_infos_pos);
417 m_add_requests[thread] = elem;
418 m_add_requested =
true;
424void MpiMultiMachineShMemWinBaseInternal::
425executeAddToAnotherSegment()
427 MPI_Barrier(m_comm_machine);
432 auto is_my_seg_edited = std::make_unique<bool[]>(m_comm_machine_size);
433 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
434 for (
const Int32 rank_asked : m_target_segments) {
435 if (rank_asked == m_comm_machine_rank) {
436 is_my_seg_edited[num_seg] =
true;
445 if (!m_add_requested) {
450 m_add_requested =
false;
453 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
454 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
455 const Int32 seg_needs_to_edit = m_target_segments[segment_infos_pos];
456 if (seg_needs_to_edit == -1)
459 bool is_found =
false;
460 for (
const Int32 rank_asked : m_target_segments) {
461 if (rank_asked == seg_needs_to_edit) {
466 ARCCORE_FATAL(
"Two subdomains ask same rank for addToAnotherSegment()");
475 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
476 if (m_add_requests[num_seg].empty() || m_add_requests[num_seg].data() ==
nullptr) {
480 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
481 const Int32 target_segment_infos_pos = m_target_segments[segment_infos_pos];
482 if (target_segment_infos_pos == -1) {
486 const Int64 actual_sizeof_win = m_sizeof_used_part[target_segment_infos_pos];
487 const Int64 future_sizeof_win = actual_sizeof_win + m_add_requests[num_seg].size();
490 Span<std::byte> rank_reserved_part_span;
493 std::byte* ptr_seg =
nullptr;
495 int error = MPI_Win_shared_query(m_all_mpi_win[target_segment_infos_pos], target_segment_infos_pos / m_nb_segments_per_proc, &size_seg, &size_type, &ptr_seg);
497 if (error != MPI_SUCCESS) {
500 rank_reserved_part_span = Span<std::byte>{ ptr_seg, size_seg };
503 if (rank_reserved_part_span.size() < future_sizeof_win) {
504 ARCCORE_FATAL(
"Bad realloc -- New size : {1} -- Needed size : {2}", rank_reserved_part_span.size(), future_sizeof_win);
508 for (Int64 pos_win = actual_sizeof_win, pos_elem = 0; pos_win < future_sizeof_win; ++pos_win, ++pos_elem) {
509 rank_reserved_part_span[pos_win] = m_add_requests[num_seg][pos_elem];
511 m_sizeof_used_part[target_segment_infos_pos] = future_sizeof_win;
513 m_add_requests[num_seg] = Span<const std::byte>{
nullptr, 0 };
514 m_target_segments[segment_infos_pos] = -1;
517 MPI_Barrier(m_comm_machine);
520 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
521 if (is_my_seg_edited[num_seg]) {
522 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
525 std::byte* ptr_seg =
nullptr;
527 int error = MPI_Win_shared_query(m_all_mpi_win[segment_infos_pos], m_comm_machine_rank, &size_seg, &size_type, &ptr_seg);
529 if (error != MPI_SUCCESS) {
532 m_reserved_part_span[num_seg] = Span<std::byte>{ ptr_seg, size_seg };
540void MpiMultiMachineShMemWinBaseInternal::
541requestReserve(Int32 num_seg, Int64 new_capacity)
543 if (new_capacity % m_sizeof_type) {
547 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
549 if (new_capacity > m_reserved_part_span[num_seg].size()) {
550 _requestRealloc(segment_infos_pos, new_capacity);
553 _requestRealloc(segment_infos_pos);
560void MpiMultiMachineShMemWinBaseInternal::
569void MpiMultiMachineShMemWinBaseInternal::
570requestResize(Int32 num_seg, Int64 new_size)
572 if (new_size == -1) {
575 if (new_size < 0 || new_size % m_sizeof_type) {
579 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
581 if (new_size > m_reserved_part_span[num_seg].size()) {
582 _requestRealloc(segment_infos_pos, new_size);
585 _requestRealloc(segment_infos_pos);
588 m_resize_requests[num_seg] = new_size;
589 m_resize_requested =
true;
595void MpiMultiMachineShMemWinBaseInternal::
600 if (!m_resize_requested) {
603 m_resize_requested =
false;
605 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
606 if (m_resize_requests[num_seg] == -1) {
610 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
612 if (m_reserved_part_span[num_seg].size() < m_resize_requests[num_seg]) {
613 ARCCORE_FATAL(
"Bad realloc -- New size : {0} -- Needed size : {1}", m_reserved_part_span[num_seg].size(), m_resize_requests[num_seg]);
616 m_sizeof_used_part[segment_infos_pos] = m_resize_requests[num_seg];
617 m_resize_requests[num_seg] = -1;
624void MpiMultiMachineShMemWinBaseInternal::
627 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
628 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
630 if (m_reserved_part_span[num_seg].size() == m_sizeof_used_part[segment_infos_pos]) {
631 _requestRealloc(segment_infos_pos);
634 _requestRealloc(segment_infos_pos, m_sizeof_used_part[segment_infos_pos]);
643void MpiMultiMachineShMemWinBaseInternal::
644_requestRealloc(Int32 owner_pos_segment, Int64 new_capacity)
const
646 m_need_resize[owner_pos_segment] = new_capacity;
652void MpiMultiMachineShMemWinBaseInternal::
653_requestRealloc(Int32 owner_pos_segment)
const
655 m_need_resize[owner_pos_segment] = -1;
661void MpiMultiMachineShMemWinBaseInternal::
666 MPI_Barrier(m_comm_machine);
672 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
673 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
674 m_need_resize[segment_infos_pos] = -1;
682 MPI_Barrier(m_comm_machine);
688void MpiMultiMachineShMemWinBaseInternal::
692 MPI_Info_create(&win_info);
693 MPI_Info_set(win_info,
"alloc_shared_noncontig",
"true");
696 for (Integer rank = 0; rank < m_comm_machine_size; ++rank) {
697 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
699 const Int32 local_segment_infos_pos = num_seg + rank * m_nb_segments_per_proc;
701 if (m_need_resize[local_segment_infos_pos] == -1)
704 ARCCORE_ASSERT(m_need_resize[local_segment_infos_pos] >= 0, (
"New size must be >= 0"));
705 ARCCORE_ASSERT(m_need_resize[local_segment_infos_pos] % m_sizeof_type == 0, (
"New size must be % sizeof type"));
709 const Int64 size_seg = (m_comm_machine_rank == rank ? (m_need_resize[local_segment_infos_pos] == 0 ? m_sizeof_type : m_need_resize[local_segment_infos_pos]) : 0);
712 MPI_Win old_win = m_all_mpi_win[local_segment_infos_pos];
713 std::byte* ptr_new_seg =
nullptr;
716 int error = MPI_Win_allocate_shared(size_seg, m_sizeof_type, win_info, m_comm_machine, &ptr_new_seg, &m_all_mpi_win[local_segment_infos_pos]);
717 if (error != MPI_SUCCESS) {
718 MPI_Win_free(&old_win);
723 if (m_comm_machine_rank == rank) {
729 std::byte* ptr_old_seg =
nullptr;
730 MPI_Aint mpi_reserved_size_new_seg;
734 MPI_Aint size_old_seg;
738 error = MPI_Win_shared_query(old_win, m_comm_machine_rank, &size_old_seg, &size_type, &ptr_old_seg);
739 if (error != MPI_SUCCESS || ptr_old_seg ==
nullptr) {
740 MPI_Win_free(&old_win);
747 std::byte* ptr_seg =
nullptr;
751 error = MPI_Win_shared_query(m_all_mpi_win[local_segment_infos_pos], m_comm_machine_rank, &mpi_reserved_size_new_seg, &size_type, &ptr_seg);
752 if (error != MPI_SUCCESS || ptr_seg ==
nullptr || ptr_seg != ptr_new_seg) {
753 MPI_Win_free(&old_win);
761 const Int64 min_size = std::min(m_need_resize[local_segment_infos_pos], m_sizeof_used_part[local_segment_infos_pos]);
763 memcpy(ptr_new_seg, ptr_old_seg, min_size);
766 MPI_Win_free(&old_win);
769 MPI_Info_free(&win_info);
772 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
773 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
777 std::byte* ptr_seg =
nullptr;
778 int error = MPI_Win_shared_query(m_all_mpi_win[segment_infos_pos], m_comm_machine_rank, &size_seg, &size_type, &ptr_seg);
780 if (error != MPI_SUCCESS) {
784 m_reserved_part_span[num_seg] = Span<std::byte>{ ptr_seg, size_seg };
791Int32 MpiMultiMachineShMemWinBaseInternal::
792_worldToMachine(Int32 world)
const
794 for (Int32 i = 0; i < m_comm_machine_size; ++i) {
795 if (m_machine_ranks[i] == world) {
805Int32 MpiMultiMachineShMemWinBaseInternal::
806_machineToWorld(Int32 machine)
const
808 return m_machine_ranks[machine];
#define ARCCORE_FATAL(...)
Macro envoyant une exception FatalErrorException.
std::int32_t Int32
Type entier signé sur 32 bits.