15#include "arccore/message_passing_mpi/internal/MpiMultiMachineShMemWinBaseInternal.h"
17#include "arccore/base/FatalErrorException.h"
22namespace Arcane::MessagePassing::Mpi
32, m_win_actual_sizeof()
33, m_win_target_segments()
34, m_comm_machine(comm_machine)
35, m_comm_machine_size(comm_machine_size)
36, m_comm_machine_rank(comm_machine_rank)
37, m_sizeof_type(sizeof_type)
38, m_nb_segments_per_proc(nb_segments_per_proc)
39, m_machine_ranks(machine_ranks)
40, m_add_requests(nb_segments_per_proc)
41, m_resize_requests(nb_segments_per_proc)
43 if (m_sizeof_type <= 0) {
46 for (
Integer i = 0; i < m_nb_segments_per_proc; ++i) {
47 if (sizeof_segments[i] < 0 || sizeof_segments[i] % m_sizeof_type != 0) {
51 if (m_nb_segments_per_proc <= 0) {
54 m_all_mpi_win.resize(m_comm_machine_size * m_nb_segments_per_proc);
55 m_reserved_part_span.resize(m_nb_segments_per_proc);
57 for (
Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
59 m_resize_requests[num_seg] = -1;
62 MPI_Info win_info_true;
63 MPI_Info_create(&win_info_true);
64 MPI_Info_set(win_info_true,
"alloc_shared_noncontig",
"true");
66 MPI_Info win_info_false;
67 MPI_Info_create(&win_info_false);
68 MPI_Info_set(win_info_false,
"alloc_shared_noncontig",
"false");
70 const Int32 pos_my_wins = m_comm_machine_rank * m_nb_segments_per_proc;
74 for (
Integer i = 0; i < m_comm_machine_size; ++i) {
75 for (
Integer j = 0; j < m_nb_segments_per_proc; ++j) {
77 if (m_comm_machine_rank == i) {
78 if (sizeof_segments[j] == 0)
79 size_seg = m_sizeof_type;
81 size_seg = sizeof_segments[j];
83 std::byte* ptr_seg =
nullptr;
84 int error = MPI_Win_allocate_shared(size_seg, m_sizeof_type, win_info_true, m_comm_machine, &ptr_seg, &m_all_mpi_win[j + i * m_nb_segments_per_proc]);
86 if (error != MPI_SUCCESS) {
92 for (
Integer i = 0; i < m_nb_segments_per_proc; ++i) {
95 std::byte* ptr_seg =
nullptr;
96 int error = MPI_Win_shared_query(m_all_mpi_win[i + pos_my_wins], m_comm_machine_rank, &size_seg, &
size_type, &ptr_seg);
98 if (error != MPI_SUCCESS) {
110 Int64* ptr_seg =
nullptr;
111 Int64* ptr_win =
nullptr;
113 int error = MPI_Win_allocate_shared(
static_cast<Int64>(
sizeof(
Int64)) * m_nb_segments_per_proc,
sizeof(
Int64), win_info_false, m_comm_machine, &ptr_seg, &m_win_need_resize);
115 if (error != MPI_SUCCESS) {
122 int error = MPI_Win_shared_query(m_win_need_resize, 0, &size_seg, &
size_type, &ptr_win);
124 if (error != MPI_SUCCESS) {
128 m_need_resize =
Span<Int64>{ ptr_win, m_comm_machine_size * m_nb_segments_per_proc };
130 for (
Integer i = 0; i < m_nb_segments_per_proc; ++i) {
131 m_need_resize[i + pos_my_wins] = -1;
134 if (ptr_win + pos_my_wins != ptr_seg) {
140 Int64* ptr_seg =
nullptr;
141 Int64* ptr_win =
nullptr;
143 int error = MPI_Win_allocate_shared(
static_cast<Int64>(
sizeof(
Int64)) * m_nb_segments_per_proc,
sizeof(
Int64), win_info_false, m_comm_machine, &ptr_seg, &m_win_actual_sizeof);
145 if (error != MPI_SUCCESS) {
152 int error = MPI_Win_shared_query(m_win_actual_sizeof, 0, &size_seg, &
size_type, &ptr_win);
154 if (error != MPI_SUCCESS) {
158 m_sizeof_used_part =
Span<Int64>{ ptr_win, m_comm_machine_size * m_nb_segments_per_proc };
160 for (
Integer i = 0; i < m_nb_segments_per_proc; ++i) {
161 m_sizeof_used_part[i + pos_my_wins] = sizeof_segments[i];
164 if (ptr_win + pos_my_wins != ptr_seg) {
170 Int32* ptr_seg =
nullptr;
171 Int32* ptr_win =
nullptr;
173 int error = MPI_Win_allocate_shared(
static_cast<Int64>(
sizeof(
Int32)) * m_nb_segments_per_proc,
sizeof(
Int32), win_info_false, m_comm_machine, &ptr_seg, &m_win_target_segments);
175 if (error != MPI_SUCCESS) {
182 int error = MPI_Win_shared_query(m_win_target_segments, 0, &size_seg, &
size_type, &ptr_win);
184 if (error != MPI_SUCCESS) {
188 m_target_segments =
Span<Int32>{ ptr_win, m_comm_machine_size * m_nb_segments_per_proc };
190 for (
Integer i = 0; i < m_nb_segments_per_proc; ++i) {
191 m_target_segments[i + pos_my_wins] = -1;
194 if (ptr_win + pos_my_wins != ptr_seg) {
199 MPI_Info_free(&win_info_false);
200 MPI_Info_free(&win_info_true);
202 MPI_Barrier(m_comm_machine);
208MpiMultiMachineShMemWinBaseInternal::
209~MpiMultiMachineShMemWinBaseInternal()
211 for (
Integer i = 0; i < m_comm_machine_size * m_nb_segments_per_proc; ++i) {
212 MPI_Win_free(&m_all_mpi_win[i]);
214 MPI_Win_free(&m_win_need_resize);
215 MPI_Win_free(&m_win_actual_sizeof);
216 MPI_Win_free(&m_win_target_segments);
225 return m_sizeof_type;
234 return m_machine_ranks;
243 MPI_Barrier(m_comm_machine);
252 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
253 return m_reserved_part_span[num_seg].subSpan(0, m_sizeof_used_part[segment_infos_pos]);
262 const Int32 segment_infos_pos = num_seg + _worldToMachine(rank) * m_nb_segments_per_proc;
266 std::byte* ptr_seg =
nullptr;
267 int error = MPI_Win_shared_query(m_all_mpi_win[segment_infos_pos], rank, &size_seg, &
size_type, &ptr_seg);
269 if (error != MPI_SUCCESS) {
273 return Span<std::byte>{ ptr_seg, m_sizeof_used_part[segment_infos_pos] };
282 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
283 return m_reserved_part_span[num_seg].subSpan(0, m_sizeof_used_part[segment_infos_pos]);
292 const Int32 segment_infos_pos = num_seg + _worldToMachine(rank) * m_nb_segments_per_proc;
296 std::byte* ptr_seg =
nullptr;
297 int error = MPI_Win_shared_query(m_all_mpi_win[segment_infos_pos], rank, &size_seg, &
size_type, &ptr_seg);
299 if (error != MPI_SUCCESS) {
312 if (elem.
size() % m_sizeof_type) {
315 if (elem.
empty() || elem.
data() ==
nullptr) {
319 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
321 const Int64 actual_sizeof_win = m_sizeof_used_part[segment_infos_pos];
322 const Int64 future_sizeof_win = actual_sizeof_win + elem.
size();
323 const Int64 old_reserved = m_reserved_part_span[num_seg].size();
325 if (future_sizeof_win > old_reserved) {
326 _requestRealloc(segment_infos_pos, future_sizeof_win);
329 _requestRealloc(segment_infos_pos);
332 m_add_requests[num_seg] = elem;
333 m_add_requested =
true;
344 if (!m_add_requested) {
347 m_add_requested =
false;
349 for (
Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
350 if (m_add_requests[num_seg].empty() || m_add_requests[num_seg].data() ==
nullptr) {
354 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
356 const Int64 actual_sizeof_win = m_sizeof_used_part[segment_infos_pos];
357 const Int64 future_sizeof_win = actual_sizeof_win + m_add_requests[num_seg].size();
359 if (m_reserved_part_span[num_seg].size() < future_sizeof_win) {
360 ARCCORE_FATAL(
"Bad realloc -- New size : {1} -- Needed size : {2}", m_reserved_part_span[num_seg].size(), future_sizeof_win);
363 for (
Int64 pos_win = actual_sizeof_win, pos_elem = 0; pos_win < future_sizeof_win; ++pos_win, ++pos_elem) {
364 m_reserved_part_span[num_seg][pos_win] = m_add_requests[num_seg][pos_elem];
366 m_sizeof_used_part[segment_infos_pos] = future_sizeof_win;
378 if (elem.
size() % m_sizeof_type) {
381 if (elem.
empty() || elem.
data() ==
nullptr) {
385 const Int32 machine_rank = _worldToMachine(rank);
386 const Int32 target_segment_infos_pos = num_seg + machine_rank * m_nb_segments_per_proc;
389 const Int32 segment_infos_pos = thread + m_comm_machine_rank * m_nb_segments_per_proc;
390 m_target_segments[segment_infos_pos] = target_segment_infos_pos;
396 std::byte* ptr_seg =
nullptr;
398 int error = MPI_Win_shared_query(m_all_mpi_win[target_segment_infos_pos], machine_rank, &size_seg, &
size_type, &ptr_seg);
400 if (error != MPI_SUCCESS) {
406 const Int64 actual_sizeof_win = m_sizeof_used_part[target_segment_infos_pos];
407 const Int64 future_sizeof_win = actual_sizeof_win + elem.
size();
408 const Int64 old_reserved = rank_reserved_part_span.
size();
410 if (future_sizeof_win > old_reserved) {
411 _requestRealloc(target_segment_infos_pos, future_sizeof_win);
414 _requestRealloc(target_segment_infos_pos);
417 m_add_requests[thread] = elem;
418 m_add_requested =
true;
427 MPI_Barrier(m_comm_machine);
429 auto is_my_seg_edited = std::make_unique<bool[]>(m_comm_machine_size);
430 for (
Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
431 for (
const Int32 rank_asked : m_target_segments) {
432 if (rank_asked == m_comm_machine_rank) {
433 is_my_seg_edited[num_seg] =
true;
439 if (!m_add_requested) {
444 m_add_requested =
false;
445 for (
Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
446 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
447 const Int32 seg_needs_to_edit = m_target_segments[segment_infos_pos];
448 if (seg_needs_to_edit == -1)
451 bool is_found =
false;
452 for (
const Int32 rank_asked : m_target_segments) {
453 if (rank_asked == seg_needs_to_edit) {
458 ARCCORE_FATAL(
"Two subdomains ask same rank for addToAnotherSegment()");
466 for (
Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
467 if (m_add_requests[num_seg].empty() || m_add_requests[num_seg].data() ==
nullptr) {
471 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
472 const Int32 target_segment_infos_pos = m_target_segments[segment_infos_pos];
473 if (target_segment_infos_pos == -1) {
477 const Int64 actual_sizeof_win = m_sizeof_used_part[target_segment_infos_pos];
478 const Int64 future_sizeof_win = actual_sizeof_win + m_add_requests[num_seg].size();
483 std::byte* ptr_seg =
nullptr;
485 int error = MPI_Win_shared_query(m_all_mpi_win[target_segment_infos_pos], target_segment_infos_pos / m_nb_segments_per_proc, &size_seg, &
size_type, &ptr_seg);
487 if (error != MPI_SUCCESS) {
493 if (rank_reserved_part_span.
size() < future_sizeof_win) {
494 ARCCORE_FATAL(
"Bad realloc -- New size : {1} -- Needed size : {2}", rank_reserved_part_span.
size(), future_sizeof_win);
497 for (
Int64 pos_win = actual_sizeof_win, pos_elem = 0; pos_win < future_sizeof_win; ++pos_win, ++pos_elem) {
498 rank_reserved_part_span[pos_win] = m_add_requests[num_seg][pos_elem];
500 m_sizeof_used_part[target_segment_infos_pos] = future_sizeof_win;
503 m_target_segments[segment_infos_pos] = -1;
506 MPI_Barrier(m_comm_machine);
508 for (
Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
509 if (is_my_seg_edited[num_seg]) {
510 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
513 std::byte* ptr_seg =
nullptr;
515 int error = MPI_Win_shared_query(m_all_mpi_win[segment_infos_pos], m_comm_machine_rank, &size_seg, &
size_type, &ptr_seg);
517 if (error != MPI_SUCCESS) {
531 if (new_capacity % m_sizeof_type) {
535 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
537 if (new_capacity <= m_reserved_part_span[num_seg].size()) {
538 _requestRealloc(segment_infos_pos);
541 _requestRealloc(segment_infos_pos, new_capacity);
559 if (new_size == -1) {
562 if (new_size < 0 || new_size % m_sizeof_type) {
566 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
568 if (new_size > m_reserved_part_span[num_seg].size()) {
569 _requestRealloc(segment_infos_pos, new_size);
572 _requestRealloc(segment_infos_pos);
575 m_resize_requests[num_seg] = new_size;
576 m_resize_requested =
true;
587 if (!m_resize_requested) {
590 m_resize_requested =
false;
592 for (
Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
593 if (m_resize_requests[num_seg] == -1) {
597 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
599 if (m_reserved_part_span[num_seg].size() < m_resize_requests[num_seg]) {
600 ARCCORE_FATAL(
"Bad realloc -- New size : {0} -- Needed size : {1}", m_reserved_part_span[num_seg].size(), m_resize_requests[num_seg]);
603 m_sizeof_used_part[segment_infos_pos] = m_resize_requests[num_seg];
604 m_resize_requests[num_seg] = -1;
614 for (
Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
615 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
617 if (m_reserved_part_span[num_seg].size() == m_sizeof_used_part[segment_infos_pos]) {
618 _requestRealloc(segment_infos_pos);
621 _requestRealloc(segment_infos_pos, m_sizeof_used_part[segment_infos_pos]);
630void MpiMultiMachineShMemWinBaseInternal::
631_requestRealloc(
Int32 owner_pos_segment,
Int64 new_capacity)
const
633 m_need_resize[owner_pos_segment] = new_capacity;
639void MpiMultiMachineShMemWinBaseInternal::
640_requestRealloc(
Int32 owner_pos_segment)
const
642 m_need_resize[owner_pos_segment] = -1;
648void MpiMultiMachineShMemWinBaseInternal::
653 MPI_Barrier(m_comm_machine);
659 for (
Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
660 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
661 m_need_resize[segment_infos_pos] = -1;
669 MPI_Barrier(m_comm_machine);
675void MpiMultiMachineShMemWinBaseInternal::
679 MPI_Info_create(&win_info);
680 MPI_Info_set(win_info,
"alloc_shared_noncontig",
"true");
683 for (
Integer rank = 0; rank < m_comm_machine_size; ++rank) {
684 for (
Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
686 const Int32 local_segment_infos_pos = num_seg + rank * m_nb_segments_per_proc;
688 if (m_need_resize[local_segment_infos_pos] == -1)
691 ARCCORE_ASSERT(m_need_resize[local_segment_infos_pos] >= 0, (
"New size must be >= 0"));
692 ARCCORE_ASSERT(m_need_resize[local_segment_infos_pos] % m_sizeof_type == 0, (
"New size must be % sizeof type"));
696 const Int64 size_seg = (m_comm_machine_rank == rank ? (m_need_resize[local_segment_infos_pos] == 0 ? m_sizeof_type : m_need_resize[local_segment_infos_pos]) : 0);
699 MPI_Win old_win = m_all_mpi_win[local_segment_infos_pos];
700 std::byte* ptr_new_seg =
nullptr;
703 int error = MPI_Win_allocate_shared(size_seg, m_sizeof_type, win_info, m_comm_machine, &ptr_new_seg, &m_all_mpi_win[local_segment_infos_pos]);
704 if (error != MPI_SUCCESS) {
705 MPI_Win_free(&old_win);
710 if (m_comm_machine_rank == rank) {
716 std::byte* ptr_old_seg =
nullptr;
717 MPI_Aint mpi_reserved_size_new_seg;
721 MPI_Aint size_old_seg;
725 error = MPI_Win_shared_query(old_win, m_comm_machine_rank, &size_old_seg, &size_type, &ptr_old_seg);
726 if (error != MPI_SUCCESS || ptr_old_seg ==
nullptr) {
727 MPI_Win_free(&old_win);
734 std::byte* ptr_seg =
nullptr;
738 error = MPI_Win_shared_query(m_all_mpi_win[local_segment_infos_pos], m_comm_machine_rank, &mpi_reserved_size_new_seg, &size_type, &ptr_seg);
739 if (error != MPI_SUCCESS || ptr_seg ==
nullptr || ptr_seg != ptr_new_seg) {
740 MPI_Win_free(&old_win);
748 const Int64 min_size = std::min(m_need_resize[local_segment_infos_pos], m_sizeof_used_part[local_segment_infos_pos]);
750 memcpy(ptr_new_seg, ptr_old_seg, min_size);
753 MPI_Win_free(&old_win);
756 MPI_Info_free(&win_info);
759 for (
Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
760 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
764 std::byte* ptr_seg =
nullptr;
765 int error = MPI_Win_shared_query(m_all_mpi_win[segment_infos_pos], m_comm_machine_rank, &size_seg, &size_type, &ptr_seg);
767 if (error != MPI_SUCCESS) {
771 m_reserved_part_span[num_seg] = Span<std::byte>{ ptr_seg, size_seg };
778Int32 MpiMultiMachineShMemWinBaseInternal::
779_worldToMachine(
Int32 world)
const
781 for (
Int32 i = 0; i < m_comm_machine_size; ++i) {
782 if (m_machine_ranks[i] == world) {
792Int32 MpiMultiMachineShMemWinBaseInternal::
793_machineToWorld(
Int32 machine)
const
795 return m_machine_ranks[machine];
#define ARCCORE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Vue constante d'un tableau de type T.
void requestAdd(Int32 num_seg, Span< const std::byte > elem)
Méthode permettant de demander l'ajout d'éléments dans l'un de nos segments.
Span< const std::byte > segmentConstView(Int32 num_seg) const
Méthode permettant d'obtenir une vue sur l'un de nos segments.
void executeResize()
Méthode permettant d'exécuter les requêtes de redimensionnement.
void executeShrink()
Méthode permettant de réduire l'espace mémoire réservé pour les segments au minimum nécessaire.
void requestReserve(Int32 num_seg, Int64 new_capacity)
Méthode permettant de demander la réservation d'espace mémoire pour un de nos segments.
void requestAddToAnotherSegment(Int32 thread, Int32 rank, Int32 num_seg, Span< const std::byte > elem)
Méthode permettant de demander l'ajout d'éléments dans un des segments de la fenêtre.
void executeReserve()
Méthode permettant d'exécuter les requêtes de réservation.
MpiMultiMachineShMemWinBaseInternal(SmallSpan< Int64 > sizeof_segments, Int32 nb_segments_per_proc, Int32 sizeof_type, const MPI_Comm &comm_machine, Int32 comm_machine_rank, Int32 comm_machine_size, ConstArrayView< Int32 > machine_ranks)
Le sizeof_segments ne doit pas être conservé !
void executeAdd()
Méthode permettant d'exécuter les requêtes d'ajout.
void requestResize(Int32 num_seg, Int64 new_size)
Méthode permettant de demander le redimensionnement d'un de nos segments.
void barrier() const
Méthode permettant d'attendre que tous les processus du noeud appellent cette méthode pour continuer ...
void executeAddToAnotherSegment()
Méthode permettant d'exécuter les requêtes d'ajout dans les segments d'autres processus.
Int32 sizeofOneElem() const
Méthode permettant d'obtenir la taille d'un élement de la fenêtre.
Span< std::byte > segmentView(Int32 num_seg)
Méthode permettant d'obtenir une vue sur l'un de nos segments.
ConstArrayView< Int32 > machineRanks() const
Méthode permettant d'obtenir les rangs qui possèdent un segment dans la fenêtre.
Vue d'un tableau d'éléments de type T.
constexpr __host__ __device__ pointer data() const noexcept
Pointeur sur le début de la vue.
constexpr __host__ __device__ bool empty() const noexcept
Retourne true si le tableau est vide (dimension nulle)
constexpr __host__ __device__ SizeType size() const noexcept
Retourne la taille du tableau.
Vue d'un tableau d'éléments de type T.
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
std::int32_t Int32
Type entier signé sur 32 bits.