Arcane  4.1.12.0
Developer documentation
Loading...
Searching...
No Matches
MpiMultiMachineShMemWinBaseInternal.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2026 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* MpiMultiMachineShMemWinBaseInternal.cc (C) 2000-2026 */
9/* */
10/* Class allowing the creation of memory windows for a compute node. */
11/* The segments of these windows are not contiguous in memory and can */
12/* be resized. A process can possess multiple segments. */
13/*---------------------------------------------------------------------------*/
14/*---------------------------------------------------------------------------*/
15
16#include "arccore/message_passing_mpi/internal/MpiMultiMachineShMemWinBaseInternal.h"
17
18#include "arccore/base/FatalErrorException.h"
19
20/*---------------------------------------------------------------------------*/
21/*---------------------------------------------------------------------------*/
22
23namespace Arcane::MessagePassing::Mpi
24{
25
26/*---------------------------------------------------------------------------*/
27/*---------------------------------------------------------------------------*/
28
31MpiMultiMachineShMemWinBaseInternal(SmallSpan<Int64> sizeof_segments, Int32 nb_segments_per_proc, Int32 sizeof_type, const MPI_Comm& comm_machine, Int32 comm_machine_rank, Int32 comm_machine_size, ConstArrayView<Int32> machine_ranks)
35, m_comm_machine(comm_machine)
36, m_comm_machine_size(comm_machine_size)
37, m_comm_machine_rank(comm_machine_rank)
38, m_sizeof_type(sizeof_type)
39, m_nb_segments_per_proc(nb_segments_per_proc)
40, m_machine_ranks(machine_ranks)
41, m_add_requests(nb_segments_per_proc)
42, m_resize_requests(nb_segments_per_proc)
43{
44 if (m_sizeof_type <= 0) {
45 ARCCORE_FATAL("Invalid sizeof_type");
46 }
47 for (Integer i = 0; i < m_nb_segments_per_proc; ++i) {
48 if (sizeof_segments[i] < 0 || sizeof_segments[i] % m_sizeof_type != 0) {
49 ARCCORE_FATAL("Invalid initial sizeof_segment");
50 }
51 }
52 if (m_nb_segments_per_proc <= 0) {
53 ARCCORE_FATAL("Invalid nb_segments_per_proc");
54 }
55 m_all_mpi_win.resize(m_comm_machine_size * m_nb_segments_per_proc);
56 m_reserved_part_span.resize(m_nb_segments_per_proc);
57
58 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
59 m_add_requests[num_seg] = Span<const std::byte>{ nullptr, 0 };
60 m_resize_requests[num_seg] = -1;
61 }
62
63 MPI_Info win_info_true;
64 MPI_Info_create(&win_info_true);
65 MPI_Info_set(win_info_true, "alloc_shared_noncontig", "true");
66
67 MPI_Info win_info_false;
68 MPI_Info_create(&win_info_false);
69 MPI_Info_set(win_info_false, "alloc_shared_noncontig", "false");
70
71 const Int32 pos_my_wins = m_comm_machine_rank * m_nb_segments_per_proc;
72
73 {
74 // We create all segments for all processes.
75 for (Integer i = 0; i < m_comm_machine_size; ++i) {
76 for (Integer j = 0; j < m_nb_segments_per_proc; ++j) {
77 Int64 size_seg = 0;
78 if (m_comm_machine_rank == i) {
79 if (sizeof_segments[j] == 0)
80 size_seg = m_sizeof_type;
81 else
82 size_seg = sizeof_segments[j];
83 }
84 std::byte* ptr_seg = nullptr;
85 int error = MPI_Win_allocate_shared(size_seg, m_sizeof_type, win_info_true, m_comm_machine, &ptr_seg, &m_all_mpi_win[j + i * m_nb_segments_per_proc]);
86
87 if (error != MPI_SUCCESS) {
88 ARCCORE_FATAL("Error with MPI_Win_allocate_shared() call");
89 }
90 }
91 }
92
93 for (Integer i = 0; i < m_nb_segments_per_proc; ++i) {
94 MPI_Aint size_seg;
95 int size_type;
96 std::byte* ptr_seg = nullptr;
97 int error = MPI_Win_shared_query(m_all_mpi_win[i + pos_my_wins], m_comm_machine_rank, &size_seg, &size_type, &ptr_seg);
98
99 if (error != MPI_SUCCESS) {
100 ARCCORE_FATAL("Error with MPI_Win_shared_query() call");
101 }
102
103 // Attention: The user requests a minimum number of reserved elements.
104 // But MPI reserves the size it wants (effect of alloc_shared_noncontig=true).
105 // We are just sure that the size it reserved is greater than or equal to sizeof_segment.
106 m_reserved_part_span[i] = Span<std::byte>{ ptr_seg, size_seg };
107 }
108 }
109
110 {
111 Int64* ptr_seg = nullptr;
112 Int64* ptr_win = nullptr;
113 {
114 int error = MPI_Win_allocate_shared(static_cast<Int64>(sizeof(Int64)) * m_nb_segments_per_proc, sizeof(Int64), win_info_false, m_comm_machine, &ptr_seg, &m_win_need_resize);
115
116 if (error != MPI_SUCCESS) {
117 ARCCORE_FATAL("Error with MPI_Win_allocate_shared() call");
118 }
119 }
120 {
121 MPI_Aint size_seg;
122 int size_type;
123 int error = MPI_Win_shared_query(m_win_need_resize, 0, &size_seg, &size_type, &ptr_win);
124
125 if (error != MPI_SUCCESS) {
126 ARCCORE_FATAL("Error with MPI_Win_shared_query() call");
127 }
128
129 m_need_resize = Span<Int64>{ ptr_win, m_comm_machine_size * m_nb_segments_per_proc };
130
131 for (Integer i = 0; i < m_nb_segments_per_proc; ++i) {
132 m_need_resize[i + pos_my_wins] = -1;
133 }
134 }
135 if (ptr_win + pos_my_wins != ptr_seg) {
136 ARCCORE_FATAL("m_win_need_resize is noncontig");
137 }
138 }
139
140 {
141 Int64* ptr_seg = nullptr;
142 Int64* ptr_win = nullptr;
143 {
144 int error = MPI_Win_allocate_shared(static_cast<Int64>(sizeof(Int64)) * m_nb_segments_per_proc, sizeof(Int64), win_info_false, m_comm_machine, &ptr_seg, &m_win_actual_sizeof);
145
146 if (error != MPI_SUCCESS) {
147 ARCCORE_FATAL("Error with MPI_Win_allocate_shared() call");
148 }
149 }
150 {
151 MPI_Aint size_seg;
152 int size_type;
153 int error = MPI_Win_shared_query(m_win_actual_sizeof, 0, &size_seg, &size_type, &ptr_win);
154
155 if (error != MPI_SUCCESS) {
156 ARCCORE_FATAL("Error with MPI_Win_shared_query() call");
157 }
158
159 m_sizeof_used_part = Span<Int64>{ ptr_win, m_comm_machine_size * m_nb_segments_per_proc };
160
161 for (Integer i = 0; i < m_nb_segments_per_proc; ++i) {
162 m_sizeof_used_part[i + pos_my_wins] = sizeof_segments[i];
163 }
164 }
165 if (ptr_win + pos_my_wins != ptr_seg) {
166 ARCCORE_FATAL("m_win_actual_sizeof is noncontig");
167 }
168 }
169
170 {
171 Int32* ptr_seg = nullptr;
172 Int32* ptr_win = nullptr;
173 {
174 int error = MPI_Win_allocate_shared(static_cast<Int64>(sizeof(Int32)) * m_nb_segments_per_proc, sizeof(Int32), win_info_false, m_comm_machine, &ptr_seg, &m_win_target_segments);
175
176 if (error != MPI_SUCCESS) {
177 ARCCORE_FATAL("Error with MPI_Win_allocate_shared() call");
178 }
179 }
180 {
181 MPI_Aint size_seg;
182 int size_type;
183 int error = MPI_Win_shared_query(m_win_target_segments, 0, &size_seg, &size_type, &ptr_win);
184
185 if (error != MPI_SUCCESS) {
186 ARCCORE_FATAL("Error with MPI_Win_shared_query() call");
187 }
188
189 m_target_segments = Span<Int32>{ ptr_win, m_comm_machine_size * m_nb_segments_per_proc };
190
191 for (Integer i = 0; i < m_nb_segments_per_proc; ++i) {
192 m_target_segments[i + pos_my_wins] = -1;
193 }
194 }
195 if (ptr_win + pos_my_wins != ptr_seg) {
196 ARCCORE_FATAL("m_win_owner_segments is noncontig");
197 }
198 }
199
200 MPI_Info_free(&win_info_false);
201 MPI_Info_free(&win_info_true);
202
203 MPI_Barrier(m_comm_machine);
204}
205
206/*---------------------------------------------------------------------------*/
207/*---------------------------------------------------------------------------*/
208
209MpiMultiMachineShMemWinBaseInternal::
210~MpiMultiMachineShMemWinBaseInternal()
211{
212 for (Integer i = 0; i < m_comm_machine_size * m_nb_segments_per_proc; ++i) {
213 MPI_Win_free(&m_all_mpi_win[i]);
214 }
215 MPI_Win_free(&m_win_need_resize);
216 MPI_Win_free(&m_win_actual_sizeof);
217 MPI_Win_free(&m_win_target_segments);
218}
219
220/*---------------------------------------------------------------------------*/
221/*---------------------------------------------------------------------------*/
222
224sizeofOneElem() const
225{
226 return m_sizeof_type;
227}
228
229/*---------------------------------------------------------------------------*/
230/*---------------------------------------------------------------------------*/
231
233machineRanks() const
234{
235 return m_machine_ranks;
236}
237
238/*---------------------------------------------------------------------------*/
239/*---------------------------------------------------------------------------*/
240
242barrier() const
243{
244 MPI_Barrier(m_comm_machine);
245}
246
247/*---------------------------------------------------------------------------*/
248/*---------------------------------------------------------------------------*/
249
251segmentView(Int32 num_seg)
252{
253 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
254 return m_reserved_part_span[num_seg].subSpan(0, m_sizeof_used_part[segment_infos_pos]);
255}
256
257/*---------------------------------------------------------------------------*/
258/*---------------------------------------------------------------------------*/
259
261segmentView(Int32 rank, Int32 num_seg)
262{
263 const Int32 segment_infos_pos = num_seg + _worldToMachine(rank) * m_nb_segments_per_proc;
264
265 MPI_Aint size_seg;
266 int size_type;
267 std::byte* ptr_seg = nullptr;
268 int error = MPI_Win_shared_query(m_all_mpi_win[segment_infos_pos], rank, &size_seg, &size_type, &ptr_seg);
269
270 if (error != MPI_SUCCESS) {
271 ARCCORE_FATAL("Error with MPI_Win_shared_query() call");
272 }
273
274 return Span<std::byte>{ ptr_seg, m_sizeof_used_part[segment_infos_pos] };
275}
276
277/*---------------------------------------------------------------------------*/
278/*---------------------------------------------------------------------------*/
279
281segmentConstView(Int32 num_seg) const
282{
283 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
284 return m_reserved_part_span[num_seg].subSpan(0, m_sizeof_used_part[segment_infos_pos]);
285}
286
287/*---------------------------------------------------------------------------*/
288/*---------------------------------------------------------------------------*/
289
291segmentConstView(Int32 rank, Int32 num_seg) const
292{
293 const Int32 segment_infos_pos = num_seg + _worldToMachine(rank) * m_nb_segments_per_proc;
294
295 MPI_Aint size_seg;
296 int size_type;
297 std::byte* ptr_seg = nullptr;
298 int error = MPI_Win_shared_query(m_all_mpi_win[segment_infos_pos], rank, &size_seg, &size_type, &ptr_seg);
299
300 if (error != MPI_SUCCESS) {
301 ARCCORE_FATAL("Error with MPI_Win_shared_query() call");
302 }
303
304 return Span<const std::byte>{ ptr_seg, m_sizeof_used_part[segment_infos_pos] };
305}
306
307/*---------------------------------------------------------------------------*/
308/*---------------------------------------------------------------------------*/
309
312{
313 if (elem.size() % m_sizeof_type) {
314 ARCCORE_FATAL("Sizeof elem not valid");
315 }
316 if (elem.empty() || elem.data() == nullptr) {
317 return;
318 }
319
320 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
321
322 const Int64 actual_sizeof_win = m_sizeof_used_part[segment_infos_pos];
323 const Int64 future_sizeof_win = actual_sizeof_win + elem.size();
324 const Int64 old_reserved = m_reserved_part_span[num_seg].size();
325
326 if (future_sizeof_win > old_reserved) {
327 _requestRealloc(segment_infos_pos, future_sizeof_win);
328 }
329 else {
330 _requestRealloc(segment_infos_pos);
331 }
332
333 m_add_requests[num_seg] = elem;
334 m_add_requested = true; // TODO Atomic ?
335}
336
337/*---------------------------------------------------------------------------*/
338/*---------------------------------------------------------------------------*/
339
342{
343 _executeRealloc();
344
345 if (!m_add_requested) {
346 return;
347 }
348 m_add_requested = false;
349
350 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
351 if (m_add_requests[num_seg].empty() || m_add_requests[num_seg].data() == nullptr) {
352 continue;
353 }
354
355 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
356
357 const Int64 actual_sizeof_win = m_sizeof_used_part[segment_infos_pos];
358 const Int64 future_sizeof_win = actual_sizeof_win + m_add_requests[num_seg].size();
359
360 if (m_reserved_part_span[num_seg].size() < future_sizeof_win) {
361 ARCCORE_FATAL("Bad realloc -- New size : {1} -- Needed size : {2}", m_reserved_part_span[num_seg].size(), future_sizeof_win);
362 }
363
364 for (Int64 pos_win = actual_sizeof_win, pos_elem = 0; pos_win < future_sizeof_win; ++pos_win, ++pos_elem) {
365 m_reserved_part_span[num_seg][pos_win] = m_add_requests[num_seg][pos_elem];
366 }
367 m_sizeof_used_part[segment_infos_pos] = future_sizeof_win;
368
369 m_add_requests[num_seg] = Span<const std::byte>{ nullptr, 0 };
370 }
371}
372
373/*---------------------------------------------------------------------------*/
374/*---------------------------------------------------------------------------*/
375
378{
379 if (elem.size() % m_sizeof_type) {
380 ARCCORE_FATAL("Sizeof elem not valid");
381 }
382 if (elem.empty() || elem.data() == nullptr) {
383 return;
384 }
385
386 const Int32 machine_rank = _worldToMachine(rank);
387 const Int32 target_segment_infos_pos = num_seg + machine_rank * m_nb_segments_per_proc;
388
389 {
390 const Int32 segment_infos_pos = thread + m_comm_machine_rank * m_nb_segments_per_proc;
391 m_target_segments[segment_infos_pos] = target_segment_infos_pos;
392 }
393
394 Span<std::byte> rank_reserved_part_span;
395 {
396 MPI_Aint size_seg;
397 std::byte* ptr_seg = nullptr;
398 int size_type;
399 int error = MPI_Win_shared_query(m_all_mpi_win[target_segment_infos_pos], machine_rank, &size_seg, &size_type, &ptr_seg);
400
401 if (error != MPI_SUCCESS) {
402 ARCCORE_FATAL("Error with MPI_Win_shared_query() call");
403 }
404 rank_reserved_part_span = Span<std::byte>{ ptr_seg, size_seg };
405 }
406
407 const Int64 actual_sizeof_win = m_sizeof_used_part[target_segment_infos_pos];
408 const Int64 future_sizeof_win = actual_sizeof_win + elem.size();
409 const Int64 old_reserved = rank_reserved_part_span.size();
410
411 if (future_sizeof_win > old_reserved) {
412 _requestRealloc(target_segment_infos_pos, future_sizeof_win);
413 }
414 else {
415 _requestRealloc(target_segment_infos_pos);
416 }
417
418 m_add_requests[thread] = elem;
419 m_add_requested = true; // TODO Atomic ?
420}
421
422/*---------------------------------------------------------------------------*/
423/*---------------------------------------------------------------------------*/
424
427{
428 MPI_Barrier(m_comm_machine);
429
430 // For each segment belonging to my process, I check if someone
431 // wants to modify it.
432 // is_my_seg_edited will be used at the end of the method.
433 auto is_my_seg_edited = std::make_unique<bool[]>(m_comm_machine_size);
434 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
435 for (const Int32 rank_asked : m_target_segments) {
436 if (rank_asked == m_comm_machine_rank) {
437 is_my_seg_edited[num_seg] = true;
438 break;
439 }
440 }
441 }
442
443 // If I have not requested a segment modification, I only perform
444 // the reallocations if needed (other processes may request reallocations from me).
445 if (!m_add_requested) {
446 _executeRealloc();
447 }
448
449 else {
450 m_add_requested = false;
451
452 // A segment can only be modified by one thread at a time. We check this here:
453 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
454 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
455 const Int32 seg_needs_to_edit = m_target_segments[segment_infos_pos];
456 if (seg_needs_to_edit == -1)
457 continue;
458
459 bool is_found = false;
460 for (const Int32 rank_asked : m_target_segments) {
461 if (rank_asked == seg_needs_to_edit) {
462 if (!is_found) {
463 is_found = true;
464 }
465 else {
466 ARCCORE_FATAL("Two subdomains ask same rank for addToAnotherSegment()");
467 }
468 }
469 }
470 }
471
472 _executeRealloc();
473
474 // We add the elements into the segments.
475 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
476 if (m_add_requests[num_seg].empty() || m_add_requests[num_seg].data() == nullptr) {
477 continue;
478 }
479
480 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
481 const Int32 target_segment_infos_pos = m_target_segments[segment_infos_pos];
482 if (target_segment_infos_pos == -1) {
483 ARCCORE_FATAL("Should not go here");
484 }
485
486 const Int64 actual_sizeof_win = m_sizeof_used_part[target_segment_infos_pos];
487 const Int64 future_sizeof_win = actual_sizeof_win + m_add_requests[num_seg].size();
488
489 // We retrieve the segment to modify.
490 Span<std::byte> rank_reserved_part_span;
491 {
492 MPI_Aint size_seg;
493 std::byte* ptr_seg = nullptr;
494 int size_type;
495 int error = MPI_Win_shared_query(m_all_mpi_win[target_segment_infos_pos], target_segment_infos_pos / m_nb_segments_per_proc, &size_seg, &size_type, &ptr_seg);
496
497 if (error != MPI_SUCCESS) {
498 ARCCORE_FATAL("Error with MPI_Win_shared_query() call");
499 }
500 rank_reserved_part_span = Span<std::byte>{ ptr_seg, size_seg };
501 }
502
503 if (rank_reserved_part_span.size() < future_sizeof_win) {
504 ARCCORE_FATAL("Bad realloc -- New size : {1} -- Needed size : {2}", rank_reserved_part_span.size(), future_sizeof_win);
505 }
506
507 // We modify it with the elements given in the requestAddToAnotherSegment() method.
508 for (Int64 pos_win = actual_sizeof_win, pos_elem = 0; pos_win < future_sizeof_win; ++pos_win, ++pos_elem) {
509 rank_reserved_part_span[pos_win] = m_add_requests[num_seg][pos_elem];
510 }
511 m_sizeof_used_part[target_segment_infos_pos] = future_sizeof_win;
512
513 m_add_requests[num_seg] = Span<const std::byte>{ nullptr, 0 };
514 m_target_segments[segment_infos_pos] = -1;
515 }
516 }
517 MPI_Barrier(m_comm_machine);
518
519 // Since others may have modified our segments, we recreate the views.
520 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
521 if (is_my_seg_edited[num_seg]) {
522 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
523
524 MPI_Aint size_seg;
525 std::byte* ptr_seg = nullptr;
526 int size_type;
527 int error = MPI_Win_shared_query(m_all_mpi_win[segment_infos_pos], m_comm_machine_rank, &size_seg, &size_type, &ptr_seg);
528
529 if (error != MPI_SUCCESS) {
530 ARCCORE_FATAL("Error with MPI_Win_shared_query() call");
531 }
532 m_reserved_part_span[num_seg] = Span<std::byte>{ ptr_seg, size_seg };
533 }
534 }
535}
536
537/*---------------------------------------------------------------------------*/
538/*---------------------------------------------------------------------------*/
539
541requestReserve(Int32 num_seg, Int64 new_capacity)
542{
543 if (new_capacity % m_sizeof_type) {
544 ARCCORE_FATAL("new_capacity not valid");
545 }
546
547 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
548
549 if (new_capacity > m_reserved_part_span[num_seg].size()) {
550 _requestRealloc(segment_infos_pos, new_capacity);
551 }
552 else {
553 _requestRealloc(segment_infos_pos);
554 }
555}
556
557/*---------------------------------------------------------------------------*/
558/*---------------------------------------------------------------------------*/
559
562{
563 _executeRealloc();
564}
565
566/*---------------------------------------------------------------------------*/
567/*---------------------------------------------------------------------------*/
568
570requestResize(Int32 num_seg, Int64 new_size)
571{
572 if (new_size == -1) {
573 return;
574 }
575 if (new_size < 0 || new_size % m_sizeof_type) {
576 ARCCORE_FATAL("new_size not valid");
577 }
578
579 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
580
581 if (new_size > m_reserved_part_span[num_seg].size()) {
582 _requestRealloc(segment_infos_pos, new_size);
583 }
584 else {
585 _requestRealloc(segment_infos_pos);
586 }
587
588 m_resize_requests[num_seg] = new_size;
589 m_resize_requested = true; // TODO Atomic ?
590}
591
592/*---------------------------------------------------------------------------*/
593/*---------------------------------------------------------------------------*/
594
597{
598 _executeRealloc();
599
600 if (!m_resize_requested) {
601 return;
602 }
603 m_resize_requested = false;
604
605 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
606 if (m_resize_requests[num_seg] == -1) {
607 continue;
608 }
609
610 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
611
612 if (m_reserved_part_span[num_seg].size() < m_resize_requests[num_seg]) {
613 ARCCORE_FATAL("Bad realloc -- New size : {0} -- Needed size : {1}", m_reserved_part_span[num_seg].size(), m_resize_requests[num_seg]);
614 }
615
616 m_sizeof_used_part[segment_infos_pos] = m_resize_requests[num_seg];
617 m_resize_requests[num_seg] = -1;
618 }
619}
620
621/*---------------------------------------------------------------------------*/
622/*---------------------------------------------------------------------------*/
623
626{
627 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
628 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
629
630 if (m_reserved_part_span[num_seg].size() == m_sizeof_used_part[segment_infos_pos]) {
631 _requestRealloc(segment_infos_pos);
632 }
633 else {
634 _requestRealloc(segment_infos_pos, m_sizeof_used_part[segment_infos_pos]);
635 }
636 }
637 _executeRealloc();
638}
639
640/*---------------------------------------------------------------------------*/
641/*---------------------------------------------------------------------------*/
642
644_requestRealloc(Int32 owner_pos_segment, Int64 new_capacity) const
645{
646 m_need_resize[owner_pos_segment] = new_capacity;
647}
648
649/*---------------------------------------------------------------------------*/
650/*---------------------------------------------------------------------------*/
651
653_requestRealloc(Int32 owner_pos_segment) const
654{
655 m_need_resize[owner_pos_segment] = -1;
656}
657
658/*---------------------------------------------------------------------------*/
659/*---------------------------------------------------------------------------*/
660
661void MpiMultiMachineShMemWinBaseInternal::
662_executeRealloc()
663{
664 // Important barrier because everyone must know that we must
665 // resize one of the segments we own.
666 MPI_Barrier(m_comm_machine);
667
668 // No need for a barrier because MPI_Win_allocate_shared() in _realloc() is
669 // blocking.
670 _realloc();
671
672 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
673 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
674 m_need_resize[segment_infos_pos] = -1;
675 }
676
677 // Important barrier in case an MPI_Win_shared_query() from
678 // _reallocCollective() takes too long (another process could
679 // call this method and set m_need_resize[m_owner_segment] to
680 // true => deadlock in _reallocCollective() on MPI_Win_allocate_shared()
681 // due to the continue).
682 MPI_Barrier(m_comm_machine);
683}
684
685/*---------------------------------------------------------------------------*/
686/*---------------------------------------------------------------------------*/
687
688void MpiMultiMachineShMemWinBaseInternal::
689_realloc()
690{
691 MPI_Info win_info;
692 MPI_Info_create(&win_info);
693 MPI_Info_set(win_info, "alloc_shared_noncontig", "true");
694
695 // Everyone reallocates their segments, if requested.
696 for (Integer rank = 0; rank < m_comm_machine_size; ++rank) {
697 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
698
699 const Int32 local_segment_infos_pos = num_seg + rank * m_nb_segments_per_proc;
700
701 if (m_need_resize[local_segment_infos_pos] == -1)
702 continue;
703
704 ARCCORE_ASSERT(m_need_resize[local_segment_infos_pos] >= 0, ("New size must be >= 0"));
705 ARCCORE_ASSERT(m_need_resize[local_segment_infos_pos] % m_sizeof_type == 0, ("New size must be % sizeof type"));
706
707 // If we must realloc our segment, we allocate at least a size of m_sizeof_type.
708 // If it is not our segment, size 0 so that MPI allocates nothing.
709 const Int64 size_seg = (m_comm_machine_rank == rank ? (m_need_resize[local_segment_infos_pos] == 0 ? m_sizeof_type : m_need_resize[local_segment_infos_pos]) : 0);
710
711 // We save the old segment to move the data.
712 MPI_Win old_win = m_all_mpi_win[local_segment_infos_pos];
713 std::byte* ptr_new_seg = nullptr;
714
715 // If size_seg == 0 then ptr_seg == nullptr.
716 int error = MPI_Win_allocate_shared(size_seg, m_sizeof_type, win_info, m_comm_machine, &ptr_new_seg, &m_all_mpi_win[local_segment_infos_pos]);
717 if (error != MPI_SUCCESS) {
718 MPI_Win_free(&old_win);
719 ARCCORE_FATAL("Error with MPI_Win_allocate_shared() call");
720 }
721
722 // We only move the data if it is our segment.
723 if (m_comm_machine_rank == rank) {
724 // We need two additional pieces of information:
725 // - the pointer to the old segment (not possible to retrieve
726 // via m_reserved_part_span due to exchanges),
727 // - the size of the new segment (MPI may allocate more than the size
728 // we requested).
729 std::byte* ptr_old_seg = nullptr;
730 MPI_Aint mpi_reserved_size_new_seg;
731
732 // Old segment.
733 {
734 MPI_Aint size_old_seg;
735 int size_type;
736 // Here, ptr_seg is never == nullptr since we always create a
737 // segment of at least m_sizeof_type size.
738 error = MPI_Win_shared_query(old_win, m_comm_machine_rank, &size_old_seg, &size_type, &ptr_old_seg);
739 if (error != MPI_SUCCESS || ptr_old_seg == nullptr) {
740 MPI_Win_free(&old_win);
741 ARCCORE_FATAL("Error with MPI_Win_shared_query() call");
742 }
743 }
744
745 // New segment.
746 {
747 std::byte* ptr_seg = nullptr;
748 int size_type;
749 // Here, ptr_seg is never == nullptr since we always create a
750 // segment of at least m_sizeof_type size.
751 error = MPI_Win_shared_query(m_all_mpi_win[local_segment_infos_pos], m_comm_machine_rank, &mpi_reserved_size_new_seg, &size_type, &ptr_seg);
752 if (error != MPI_SUCCESS || ptr_seg == nullptr || ptr_seg != ptr_new_seg) {
753 MPI_Win_free(&old_win);
754 ARCCORE_FATAL("Error with MPI_Win_shared_query() call");
755 }
756 }
757
758 // If the realloc is a reduction in segment size (user
759 // used space, if resize for example), we cannot copy all the old data.
760 const Int64 min_size = std::min(m_need_resize[local_segment_infos_pos], m_sizeof_used_part[local_segment_infos_pos]);
761
762 memcpy(ptr_new_seg, ptr_old_seg, min_size);
763 }
764
765 MPI_Win_free(&old_win);
766 }
767 }
768 MPI_Info_free(&win_info);
769
770 // We reconstruct the spans of the segments we own.
771 for (Integer num_seg = 0; num_seg < m_nb_segments_per_proc; ++num_seg) {
772 const Int32 segment_infos_pos = num_seg + m_comm_machine_rank * m_nb_segments_per_proc;
773
774 MPI_Aint size_seg;
775 int size_type;
776 std::byte* ptr_seg = nullptr;
777 int error = MPI_Win_shared_query(m_all_mpi_win[segment_infos_pos], m_comm_machine_rank, &size_seg, &size_type, &ptr_seg);
778
779 if (error != MPI_SUCCESS) {
780 ARCCORE_FATAL("Error with MPI_Win_shared_query() call");
781 }
782
783 m_reserved_part_span[num_seg] = Span<std::byte>{ ptr_seg, size_seg };
784 }
785}
786
787/*---------------------------------------------------------------------------*/
788/*---------------------------------------------------------------------------*/
789
790Int32 MpiMultiMachineShMemWinBaseInternal::
791_worldToMachine(Int32 world) const
792{
793 for (Int32 i = 0; i < m_comm_machine_size; ++i) {
794 if (m_machine_ranks[i] == world) {
795 return i;
796 }
797 }
798 ARCCORE_FATAL("Rank is not in machine");
799}
800
801/*---------------------------------------------------------------------------*/
802/*---------------------------------------------------------------------------*/
803
804Int32 MpiMultiMachineShMemWinBaseInternal::
805_machineToWorld(Int32 machine) const
806{
807 return m_machine_ranks[machine];
808}
809
810/*---------------------------------------------------------------------------*/
811/*---------------------------------------------------------------------------*/
812
813} // namespace Arcane::MessagePassing::Mpi
814
815/*---------------------------------------------------------------------------*/
816/*---------------------------------------------------------------------------*/
#define ARCCORE_FATAL(...)
Macro throwing a FatalErrorException.
Constant view of an array of type T.
void requestAdd(Int32 num_seg, Span< const std::byte > elem)
Method to request the addition of elements into one of our segments.
Span< const std::byte > segmentConstView(Int32 num_seg) const
Method to get a constant view of one of our segments.
void executeShrink()
Method to reduce the reserved memory space for the segments to the minimum necessary.
void requestReserve(Int32 num_seg, Int64 new_capacity)
Method to request the reservation of memory space for one of our segments.
void requestAddToAnotherSegment(Int32 thread, Int32 rank, Int32 num_seg, Span< const std::byte > elem)
Method to request the addition of elements into one of the segments of the window.
void _requestRealloc(Int32 owner_pos_segment, Int64 new_capacity) const
Method to request a reallocation.
MpiMultiMachineShMemWinBaseInternal(SmallSpan< Int64 > sizeof_segments, Int32 nb_segments_per_proc, Int32 sizeof_type, const MPI_Comm &comm_machine, Int32 comm_machine_rank, Int32 comm_machine_size, ConstArrayView< Int32 > machine_ranks)
sizeof_segments should not be preserved!
void requestResize(Int32 num_seg, Int64 new_size)
Method to request the resizing of one of our segments.
void barrier() const
Method to wait until all processes on the node call this method to continue execution.
void executeAddToAnotherSegment()
Method to execute the addition requests in the segments of other processes.
Int32 sizeofOneElem() const
Method to get the size of an element in the window.
Span< std::byte > segmentView(Int32 num_seg)
Method to get a view of one of our segments.
ConstArrayView< Int32 > machineRanks() const
Method to get the ranks that own a segment in the window.
View of an array of elements of type T.
Definition Span.h:805
constexpr __host__ __device__ pointer data() const noexcept
Pointer to the start of the view.
Definition Span.h:539
constexpr __host__ __device__ bool empty() const noexcept
Returns true if the array is empty (zero dimension).
Definition Span.h:492
constexpr __host__ __device__ SizeType size() const noexcept
Returns the size of the array.
Definition Span.h:327
View of an array of elements of type T.
Definition Span.h:635
std::int64_t Int64
Signed integer type of 64 bits.
Int32 Integer
Type representing an integer.
std::int32_t Int32
Signed integer type of 32 bits.