15#include "arccore/message_passing_mpi/internal/MpiMultiMachineShMemWinBaseInternal.h"
17#include "arccore/base/FatalErrorException.h"
22namespace Arcane::MessagePassing::Mpi
29MpiMultiMachineShMemWinBaseInternal::
30MpiMultiMachineShMemWinBaseInternal(SmallSpan<Int64> sizeof_segments,
Int32 nb_segments_per_proc,
Int32 sizeof_type,
const MPI_Comm& comm_machine,
Int32 comm_machine_rank,
Int32 comm_machine_size, ConstArrayView<Int32> machine_ranks)
32, m_win_actual_sizeof()
33, m_win_target_segments()
34, m_comm_machine(comm_machine)
35, m_comm_machine_size(comm_machine_size)
36, m_comm_machine_rank(comm_machine_rank)
37, m_sizeof_type(sizeof_type)
38, m_nb_segments_per_proc(nb_segments_per_proc)
39, m_machine_ranks(machine_ranks)
40, m_add_requests(nb_segments_per_proc)
41, m_resize_requests(nb_segments_per_proc)
43 if (m_sizeof_type <= 0) {
46 for (Integer i = 0; i < m_nb_segments_per_proc; ++i) {
47 if (sizeof_segments[i] < 0 || sizeof_segments[i] % m_sizeof_type != 0) {
51 if (m_nb_segments_per_proc <= 0) {
54 m_all_mpi_win.resize(m_comm_machine_size * m_nb_segments_per_proc);
55 m_reserved_part_span.resize(m_nb_segments_per_proc);
57 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
58 m_add_requests[num_seg] = Span<const std::byte>{
nullptr, 0 };
59 m_resize_requests[num_seg] = -1;
62 MPI_Info win_info_true;
63 MPI_Info_create(&win_info_true);
64 MPI_Info_set(win_info_true,
"alloc_shared_noncontig",
"true");
66 MPI_Info win_info_false;
67 MPI_Info_create(&win_info_false);
68 MPI_Info_set(win_info_false,
"alloc_shared_noncontig",
"false");
70 const Int32 pos_my_wins = m_comm_machine_rank * m_nb_segments_per_proc;
74 for (Integer i = 0; i < m_comm_machine_size; ++i) {
75 for (Integer j = 0; j < m_nb_segments_per_proc; ++j) {
77 if (m_comm_machine_rank == i) {
78 if (sizeof_segments[j] == 0)
79 size_seg = m_sizeof_type;
81 size_seg = sizeof_segments[j];
83 std::byte* ptr_seg =
nullptr;
84 int error = MPI_Win_allocate_shared(size_seg, m_sizeof_type, win_info_true, m_comm_machine, &ptr_seg, &m_all_mpi_win[j + i * m_nb_segments_per_proc]);
86 if (error != MPI_SUCCESS) {
92 for (Integer i = 0; i < m_nb_segments_per_proc; ++i) {
95 std::byte* ptr_seg =
nullptr;
96 int error = MPI_Win_shared_query(m_all_mpi_win[i + pos_my_wins], m_comm_machine_rank, &size_seg, &size_type, &ptr_seg);
98 if (error != MPI_SUCCESS) {
105 m_reserved_part_span[i] = Span<std::byte>{ ptr_seg, size_seg };
110 Int64* ptr_seg =
nullptr;
111 Int64* ptr_win =
nullptr;
113 int error = MPI_Win_allocate_shared(
static_cast<Int64
>(
sizeof(Int64)) * m_nb_segments_per_proc,
sizeof(Int64), win_info_false, m_comm_machine, &ptr_seg, &m_win_need_resize);
115 if (error != MPI_SUCCESS) {
122 int error = MPI_Win_shared_query(m_win_need_resize, 0, &size_seg, &size_type, &ptr_win);
124 if (error != MPI_SUCCESS) {
128 m_need_resize = Span<Int64>{ ptr_win, m_comm_machine_size * m_nb_segments_per_proc };
130 for (Integer i = 0; i < m_nb_segments_per_proc; ++i) {
131 m_need_resize[i + pos_my_wins] = -1;
134 if (ptr_win + pos_my_wins != ptr_seg) {
140 Int64* ptr_seg =
nullptr;
141 Int64* ptr_win =
nullptr;
143 int error = MPI_Win_allocate_shared(
static_cast<Int64
>(
sizeof(Int64)) * m_nb_segments_per_proc,
sizeof(Int64), win_info_false, m_comm_machine, &ptr_seg, &m_win_actual_sizeof);
145 if (error != MPI_SUCCESS) {
152 int error = MPI_Win_shared_query(m_win_actual_sizeof, 0, &size_seg, &size_type, &ptr_win);
154 if (error != MPI_SUCCESS) {
158 m_sizeof_used_part = Span<Int64>{ ptr_win, m_comm_machine_size * m_nb_segments_per_proc };
160 for (Integer i = 0; i < m_nb_segments_per_proc; ++i) {
161 m_sizeof_used_part[i + pos_my_wins] = sizeof_segments[i];
164 if (ptr_win + pos_my_wins != ptr_seg) {
170 Int32* ptr_seg =
nullptr;
171 Int32* ptr_win =
nullptr;
173 int error = MPI_Win_allocate_shared(
static_cast<Int64
>(
sizeof(Int32)) * m_nb_segments_per_proc,
sizeof(Int32), win_info_false, m_comm_machine, &ptr_seg, &m_win_target_segments);
175 if (error != MPI_SUCCESS) {
182 int error = MPI_Win_shared_query(m_win_target_segments, 0, &size_seg, &size_type, &ptr_win);
184 if (error != MPI_SUCCESS) {
188 m_target_segments = Span<Int32>{ ptr_win, m_comm_machine_size * m_nb_segments_per_proc };
190 for (Integer i = 0; i < m_nb_segments_per_proc; ++i) {
191 m_target_segments[i + pos_my_wins] = -1;
194 if (ptr_win + pos_my_wins != ptr_seg) {
199 MPI_Info_free(&win_info_false);
200 MPI_Info_free(&win_info_true);
202 MPI_Barrier(m_comm_machine);
208MpiMultiMachineShMemWinBaseInternal::
209~MpiMultiMachineShMemWinBaseInternal()
211 for (Integer i = 0; i < m_comm_machine_size * m_nb_segments_per_proc; ++i) {
212 MPI_Win_free(&m_all_mpi_win[i]);
214 MPI_Win_free(&m_win_need_resize);
215 MPI_Win_free(&m_win_actual_sizeof);
216 MPI_Win_free(&m_win_target_segments);
222Int32 MpiMultiMachineShMemWinBaseInternal::
225 return m_sizeof_type;
231ConstArrayView<Int32> MpiMultiMachineShMemWinBaseInternal::
234 return m_machine_ranks;
240void MpiMultiMachineShMemWinBaseInternal::
243 MPI_Barrier(m_comm_machine);
249Span<std::byte> MpiMultiMachineShMemWinBaseInternal::
250segmentView(Int32 num_seg)
252 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
253 return m_reserved_part_span[num_seg].subSpan(0, m_sizeof_used_part[segment_infos_pos]);
259Span<std::byte> MpiMultiMachineShMemWinBaseInternal::
260segmentView(Int32 rank, Int32 num_seg)
262 const Int32 segment_infos_pos = num_seg + _worldToMachine(rank) * m_nb_segments_per_proc;
266 std::byte* ptr_seg =
nullptr;
267 int error = MPI_Win_shared_query(m_all_mpi_win[segment_infos_pos], rank, &size_seg, &size_type, &ptr_seg);
269 if (error != MPI_SUCCESS) {
273 return Span<std::byte>{ ptr_seg, m_sizeof_used_part[segment_infos_pos] };
279Span<const std::byte> MpiMultiMachineShMemWinBaseInternal::
280segmentConstView(Int32 num_seg)
const
282 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
283 return m_reserved_part_span[num_seg].subSpan(0, m_sizeof_used_part[segment_infos_pos]);
289Span<const std::byte> MpiMultiMachineShMemWinBaseInternal::
290segmentConstView(Int32 rank, Int32 num_seg)
const
292 const Int32 segment_infos_pos = num_seg + _worldToMachine(rank) * m_nb_segments_per_proc;
296 std::byte* ptr_seg =
nullptr;
297 int error = MPI_Win_shared_query(m_all_mpi_win[segment_infos_pos], rank, &size_seg, &size_type, &ptr_seg);
299 if (error != MPI_SUCCESS) {
303 return Span<const std::byte>{ ptr_seg, m_sizeof_used_part[segment_infos_pos] };
309void MpiMultiMachineShMemWinBaseInternal::
310requestAdd(Int32 num_seg, Span<const std::byte> elem)
312 if (elem.size() % m_sizeof_type) {
315 if (elem.empty() || elem.data() ==
nullptr) {
319 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
321 const Int64 actual_sizeof_win = m_sizeof_used_part[segment_infos_pos];
322 const Int64 future_sizeof_win = actual_sizeof_win + elem.size();
323 const Int64 old_reserved = m_reserved_part_span[num_seg].size();
325 if (future_sizeof_win > old_reserved) {
326 _requestRealloc(segment_infos_pos, future_sizeof_win);
329 _requestRealloc(segment_infos_pos);
332 m_add_requests[num_seg] = elem;
333 m_add_requested =
true;
339void MpiMultiMachineShMemWinBaseInternal::
344 if (!m_add_requested) {
347 m_add_requested =
false;
349 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
350 if (m_add_requests[num_seg].empty() || m_add_requests[num_seg].data() ==
nullptr) {
354 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
356 const Int64 actual_sizeof_win = m_sizeof_used_part[segment_infos_pos];
357 const Int64 future_sizeof_win = actual_sizeof_win + m_add_requests[num_seg].size();
359 if (m_reserved_part_span[num_seg].size() < future_sizeof_win) {
360 ARCCORE_FATAL(
"Bad realloc -- New size : {1} -- Needed size : {2}", m_reserved_part_span[num_seg].size(), future_sizeof_win);
363 for (Int64 pos_win = actual_sizeof_win, pos_elem = 0; pos_win < future_sizeof_win; ++pos_win, ++pos_elem) {
364 m_reserved_part_span[num_seg][pos_win] = m_add_requests[num_seg][pos_elem];
366 m_sizeof_used_part[segment_infos_pos] = future_sizeof_win;
368 m_add_requests[num_seg] = Span<const std::byte>{
nullptr, 0 };
375void MpiMultiMachineShMemWinBaseInternal::
376requestAddToAnotherSegment(Int32 thread, Int32 rank, Int32 num_seg, Span<const std::byte> elem)
378 if (elem.size() % m_sizeof_type) {
381 if (elem.empty() || elem.data() ==
nullptr) {
385 const Int32 machine_rank = _worldToMachine(rank);
386 const Int32 target_segment_infos_pos = num_seg + machine_rank * m_nb_segments_per_proc;
389 const Int32 segment_infos_pos = thread + m_comm_machine_rank * m_nb_segments_per_proc;
390 m_target_segments[segment_infos_pos] = target_segment_infos_pos;
393 Span<std::byte> rank_reserved_part_span;
396 std::byte* ptr_seg =
nullptr;
398 int error = MPI_Win_shared_query(m_all_mpi_win[target_segment_infos_pos], machine_rank, &size_seg, &size_type, &ptr_seg);
400 if (error != MPI_SUCCESS) {
403 rank_reserved_part_span = Span<std::byte>{ ptr_seg, size_seg };
406 const Int64 actual_sizeof_win = m_sizeof_used_part[target_segment_infos_pos];
407 const Int64 future_sizeof_win = actual_sizeof_win + elem.size();
408 const Int64 old_reserved = rank_reserved_part_span.size();
410 if (future_sizeof_win > old_reserved) {
411 _requestRealloc(target_segment_infos_pos, future_sizeof_win);
414 _requestRealloc(target_segment_infos_pos);
417 m_add_requests[thread] = elem;
418 m_add_requested =
true;
424void MpiMultiMachineShMemWinBaseInternal::
425executeAddToAnotherSegment()
427 MPI_Barrier(m_comm_machine);
429 auto is_my_seg_edited = std::make_unique<bool[]>(m_comm_machine_size);
430 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
431 for (
const Int32 rank_asked : m_target_segments) {
432 if (rank_asked == m_comm_machine_rank) {
433 is_my_seg_edited[num_seg] =
true;
439 if (!m_add_requested) {
444 m_add_requested =
false;
445 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
446 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
447 const Int32 seg_needs_to_edit = m_target_segments[segment_infos_pos];
448 if (seg_needs_to_edit == -1)
451 bool is_found =
false;
452 for (
const Int32 rank_asked : m_target_segments) {
453 if (rank_asked == seg_needs_to_edit) {
458 ARCCORE_FATAL(
"Two subdomains ask same rank for addToAnotherSegment()");
466 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
467 if (m_add_requests[num_seg].empty() || m_add_requests[num_seg].data() ==
nullptr) {
471 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
472 const Int32 target_segment_infos_pos = m_target_segments[segment_infos_pos];
473 if (target_segment_infos_pos == -1) {
477 const Int64 actual_sizeof_win = m_sizeof_used_part[target_segment_infos_pos];
478 const Int64 future_sizeof_win = actual_sizeof_win + m_add_requests[num_seg].size();
480 Span<std::byte> rank_reserved_part_span;
483 std::byte* ptr_seg =
nullptr;
485 int error = MPI_Win_shared_query(m_all_mpi_win[target_segment_infos_pos], target_segment_infos_pos / m_nb_segments_per_proc, &size_seg, &size_type, &ptr_seg);
487 if (error != MPI_SUCCESS) {
490 rank_reserved_part_span = Span<std::byte>{ ptr_seg, size_seg };
493 if (rank_reserved_part_span.size() < future_sizeof_win) {
494 ARCCORE_FATAL(
"Bad realloc -- New size : {1} -- Needed size : {2}", rank_reserved_part_span.size(), future_sizeof_win);
497 for (Int64 pos_win = actual_sizeof_win, pos_elem = 0; pos_win < future_sizeof_win; ++pos_win, ++pos_elem) {
498 rank_reserved_part_span[pos_win] = m_add_requests[num_seg][pos_elem];
500 m_sizeof_used_part[target_segment_infos_pos] = future_sizeof_win;
502 m_add_requests[num_seg] = Span<const std::byte>{
nullptr, 0 };
503 m_target_segments[segment_infos_pos] = -1;
506 MPI_Barrier(m_comm_machine);
508 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
509 if (is_my_seg_edited[num_seg]) {
510 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
513 std::byte* ptr_seg =
nullptr;
515 int error = MPI_Win_shared_query(m_all_mpi_win[segment_infos_pos], m_comm_machine_rank, &size_seg, &size_type, &ptr_seg);
517 if (error != MPI_SUCCESS) {
520 m_reserved_part_span[num_seg] = Span<std::byte>{ ptr_seg, size_seg };
528void MpiMultiMachineShMemWinBaseInternal::
529requestReserve(Int32 num_seg, Int64 new_capacity)
531 if (new_capacity % m_sizeof_type) {
535 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
537 if (new_capacity <= m_reserved_part_span[num_seg].size()) {
538 _requestRealloc(segment_infos_pos);
541 _requestRealloc(segment_infos_pos, new_capacity);
547void MpiMultiMachineShMemWinBaseInternal::
556void MpiMultiMachineShMemWinBaseInternal::
557requestResize(Int32 num_seg, Int64 new_size)
559 if (new_size == -1) {
562 if (new_size < 0 || new_size % m_sizeof_type) {
566 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
568 if (new_size > m_reserved_part_span[num_seg].size()) {
569 _requestRealloc(segment_infos_pos, new_size);
572 _requestRealloc(segment_infos_pos);
575 m_resize_requests[num_seg] = new_size;
576 m_resize_requested =
true;
582void MpiMultiMachineShMemWinBaseInternal::
587 if (!m_resize_requested) {
590 m_resize_requested =
false;
592 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
593 if (m_resize_requests[num_seg] == -1) {
597 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
599 if (m_reserved_part_span[num_seg].size() < m_resize_requests[num_seg]) {
600 ARCCORE_FATAL(
"Bad realloc -- New size : {0} -- Needed size : {1}", m_reserved_part_span[num_seg].size(), m_resize_requests[num_seg]);
603 m_sizeof_used_part[segment_infos_pos] = m_resize_requests[num_seg];
604 m_resize_requests[num_seg] = -1;
611void MpiMultiMachineShMemWinBaseInternal::
614 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
615 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
617 if (m_reserved_part_span[num_seg].size() == m_sizeof_used_part[segment_infos_pos]) {
618 _requestRealloc(segment_infos_pos);
621 _requestRealloc(segment_infos_pos, m_sizeof_used_part[segment_infos_pos]);
630void MpiMultiMachineShMemWinBaseInternal::
631_requestRealloc(Int32 owner_pos_segment, Int64 new_capacity)
const
633 m_need_resize[owner_pos_segment] = new_capacity;
639void MpiMultiMachineShMemWinBaseInternal::
640_requestRealloc(Int32 owner_pos_segment)
const
642 m_need_resize[owner_pos_segment] = -1;
648void MpiMultiMachineShMemWinBaseInternal::
653 MPI_Barrier(m_comm_machine);
659 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
660 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
661 m_need_resize[segment_infos_pos] = -1;
669 MPI_Barrier(m_comm_machine);
675void MpiMultiMachineShMemWinBaseInternal::
679 MPI_Info_create(&win_info);
680 MPI_Info_set(win_info,
"alloc_shared_noncontig",
"true");
683 for (Integer rank = 0; rank < m_comm_machine_size; ++rank) {
684 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
686 const Int32 local_segment_infos_pos = num_seg + rank * m_nb_segments_per_proc;
688 if (m_need_resize[local_segment_infos_pos] == -1)
691 ARCCORE_ASSERT(m_need_resize[local_segment_infos_pos] >= 0, (
"New size must be >= 0"));
692 ARCCORE_ASSERT(m_need_resize[local_segment_infos_pos] % m_sizeof_type == 0, (
"New size must be % sizeof type"));
696 const Int64 size_seg = (m_comm_machine_rank == rank ? (m_need_resize[local_segment_infos_pos] == 0 ? m_sizeof_type : m_need_resize[local_segment_infos_pos]) : 0);
699 MPI_Win old_win = m_all_mpi_win[local_segment_infos_pos];
700 std::byte* ptr_new_seg =
nullptr;
703 int error = MPI_Win_allocate_shared(size_seg, m_sizeof_type, win_info, m_comm_machine, &ptr_new_seg, &m_all_mpi_win[local_segment_infos_pos]);
704 if (error != MPI_SUCCESS) {
705 MPI_Win_free(&old_win);
710 if (m_comm_machine_rank == rank) {
716 std::byte* ptr_old_seg =
nullptr;
717 MPI_Aint mpi_reserved_size_new_seg;
721 MPI_Aint size_old_seg;
725 error = MPI_Win_shared_query(old_win, m_comm_machine_rank, &size_old_seg, &size_type, &ptr_old_seg);
726 if (error != MPI_SUCCESS || ptr_old_seg ==
nullptr) {
727 MPI_Win_free(&old_win);
734 std::byte* ptr_seg =
nullptr;
738 error = MPI_Win_shared_query(m_all_mpi_win[local_segment_infos_pos], m_comm_machine_rank, &mpi_reserved_size_new_seg, &size_type, &ptr_seg);
739 if (error != MPI_SUCCESS || ptr_seg ==
nullptr || ptr_seg != ptr_new_seg) {
740 MPI_Win_free(&old_win);
748 const Int64 min_size = std::min(m_need_resize[local_segment_infos_pos], m_sizeof_used_part[local_segment_infos_pos]);
750 memcpy(ptr_new_seg, ptr_old_seg, min_size);
753 MPI_Win_free(&old_win);
756 MPI_Info_free(&win_info);
759 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
760 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
764 std::byte* ptr_seg =
nullptr;
765 int error = MPI_Win_shared_query(m_all_mpi_win[segment_infos_pos], m_comm_machine_rank, &size_seg, &size_type, &ptr_seg);
767 if (error != MPI_SUCCESS) {
771 m_reserved_part_span[num_seg] = Span<std::byte>{ ptr_seg, size_seg };
778Int32 MpiMultiMachineShMemWinBaseInternal::
779_worldToMachine(Int32 world)
const
781 for (Int32 i = 0; i < m_comm_machine_size; ++i) {
782 if (m_machine_ranks[i] == world) {
792Int32 MpiMultiMachineShMemWinBaseInternal::
793_machineToWorld(Int32 machine)
const
795 return m_machine_ranks[machine];
#define ARCCORE_FATAL(...)
Macro envoyant une exception FatalErrorException.
std::int32_t Int32
Type entier signé sur 32 bits.