14#include "arcane/parallel/mpithread/HybridParallelMng.h"
16#include "arcane/utils/NotImplementedException.h"
17#include "arcane/utils/NotSupportedException.h"
18#include "arcane/utils/FatalErrorException.h"
19#include "arcane/utils/NumericTypes.h"
20#include "arcane/utils/ArgumentException.h"
21#include "arcane/utils/IThreadBarrier.h"
22#include "arcane/utils/ITraceMng.h"
24#include "arcane/core/IIOMng.h"
25#include "arcane/core/Timer.h"
26#include "arcane/core/ISerializeMessageList.h"
27#include "arcane/core/IItemFamily.h"
28#include "arcane/core/IParallelTopology.h"
29#include "arcane/core/internal/ParallelMngInternal.h"
30#include "arcane/core/internal/SerializeMessage.h"
31#include "arcane/core/internal/MachineShMemWinMemoryAllocator.h"
32#include "arcane/core/parallel/IStat.h"
34#include "arcane/parallel/mpithread/HybridParallelDispatch.h"
35#include "arcane/parallel/mpithread/HybridMessageQueue.h"
36#include "arcane/parallel/mpithread/internal/HybridMachineShMemWinBaseInternalCreator.h"
37#include "arcane/parallel/mpithread/internal/HybridContigMachineShMemWinBaseInternal.h"
38#include "arcane/parallel/mpithread/internal/HybridMachineShMemWinBaseInternal.h"
40#include "arcane/parallel/mpi/MpiParallelMng.h"
42#include "arcane/impl/TimerMng.h"
43#include "arcane/impl/ParallelReplication.h"
44#include "arcane/impl/SequentialParallelMng.h"
45#include "arcane/impl/internal/ParallelMngUtilsFactoryBase.h"
48#include "arccore/message_passing/RequestListBase.h"
49#include "arccore/message_passing/internal/SerializeMessageList.h"
68class HybridSerializeMessageList
73 class HybridSerializeMessageRequest
92 , m_trace(mpm->traceMng())
100 m_messages_to_process.add(msg);
109 case Parallel::WaitAll:
112 _wait(Parallel::WaitAll);
113 m_messages_to_process.clear();
136void HybridSerializeMessageList::
139 m_trace->
info() <<
"BEGIN PROCESS MESSAGES";
159 if (wait_mode == Parallel::WaitAll)
160 message_queue->waitAll(all_requests);
162 for (ISerializeMessage* sm : messages)
163 sm->setFinished(
true);
173:
public ParallelMngInternal
178 : ParallelMngInternal(pm)
180 , m_window_creator(window_creator)
184 ~Impl()
override =
default;
190 return m_parallel_mng->m_mpi_parallel_mng->commRank() * m_parallel_mng->m_local_nb_rank;
194 return m_parallel_mng->m_local_nb_rank;
199 m_parallel_mng->traceMng()->debug() <<
"initializeWindowCreator() Hybrid";
200 m_window_creator->initializeMpiWindowCreator(m_parallel_mng->commRank(), m_parallel_mng->mpiParallelMng());
205 if (m_shmem_available == 1) {
209 if (m_shmem_available == 0) {
211 if (topo->machineRanks().size() == m_window_creator->machineRanks().size()) {
212 m_shmem_available = 1;
216 m_shmem_available = 2;
225 return makeRef(m_window_creator->createWindow(m_parallel_mng->commRank(), sizeof_segment, sizeof_type, m_parallel_mng->mpiParallelMng()));
230 return makeRef(m_window_creator->createDynamicWindow(m_parallel_mng->commRank(), sizeof_segment, sizeof_type, m_parallel_mng->mpiParallelMng()));
240 return m_window_creator->machineRanks();
245 m_window_creator->machineBarrier(m_parallel_mng->commRank(), m_parallel_mng->mpiParallelMng());
250 HybridParallelMng* m_parallel_mng;
257 Int8 m_shmem_available = 0;
269, m_trace(bi.trace_mng)
270, m_thread_mng(bi.thread_mng)
271, m_world_parallel_mng(bi.world_parallel_mng)
273, m_timer_mng(nullptr)
275, m_message_queue(new
HybridMessageQueue(bi.message_queue, bi.mpi_parallel_mng, bi.local_nb_rank))
276, m_is_initialized(false)
277, m_stat(
Parallel::createDefaultStat())
278, m_thread_barrier(bi.thread_barrier)
279, m_mpi_parallel_mng(bi.mpi_parallel_mng)
280, m_all_dispatchers(bi.all_dispatchers)
281, m_sub_builder_factory(bi.sub_builder_factory)
282, m_parent_container_ref(bi.container)
284, m_parallel_mng_internal(new Impl(this, bi.window_creator))
286 if (!m_world_parallel_mng)
287 m_world_parallel_mng =
this;
294 Int32 mpi_rank = m_mpi_parallel_mng->commRank();
295 Int32 mpi_size = m_mpi_parallel_mng->commSize();
309 m_sequential_parallel_mng.reset();
310 delete m_replication;
312 delete m_message_queue;
315 delete m_mpi_parallel_mng;
316 delete m_parallel_mng_internal;
325 class DispatchCreator
329 DispatchCreator(ITraceMng* tm, HybridParallelMng* mpm, HybridMessageQueue* message_queue, MpiThreadAllDispatcher* all_dispatchers)
332 , m_message_queue(message_queue)
333 , m_all_dispatchers(all_dispatchers)
338 template <
typename DataType> HybridParallelDispatch<DataType>*
341 HybridMessageQueue* tmq = m_message_queue;
342 MpiThreadAllDispatcher* ad = m_all_dispatchers;
343 auto field = ad->instance((DataType*)
nullptr).view();
344 return new HybridParallelDispatch<DataType>(m_tm, m_mpm, tmq, field);
348 HybridParallelMng* m_mpm;
349 HybridMessageQueue* m_message_queue;
350 MpiThreadAllDispatcher* m_all_dispatchers;
361 tm->
info() <<
"Initialise HybridParallelMng"
364 <<
" mpi_rank=" << m_mpi_parallel_mng->commRank();
374 m_sequential_parallel_mng = arcaneCreateSequentialParallelMngRef(bi);
377 DispatchCreator creator(m_trace,
this, m_message_queue, m_all_dispatchers);
378 this->createDispatchers(creator);
379 m_io_mng = arcaneCreateIOMng(
this);
381 m_parallel_mng_internal->initializeWindowCreator();
395 m_trace->warning() <<
"HybridParallelMng already initialized";
420 return m_utils_factory->createGetVariablesValuesOperation(
this)._release();
426 return m_utils_factory->createTransferValuesOperation(
this)._release();
432 return m_utils_factory->createExchanger(
this)._release();
441void HybridParallelMng::
444 auto p2p_message =
buildMessage(rank, Parallel::NonBlocking);
445 Request r = m_message_queue->addSend(p2p_message, s);
452auto HybridParallelMng::
455 ARCANE_UNUSED(bytes);
456 auto p2p_message =
buildMessage(rank, Parallel::NonBlocking);
457 return m_message_queue->addSend(p2p_message, s);
466 return m_utils_factory->createSendSerializeMessage(
this, rank)._release();
472void HybridParallelMng::
478 bool is_broadcaster = (rank ==
commRank());
486 if (is_broadcaster) {
487 Int64 total_size = sbuf->totalSize();
493 Int64 total_size = 0;
495 sbuf->preallocate(total_size);
498 sbuf->setFromSizes();
505void HybridParallelMng::
506recvSerializer(ISerializer* s,
Int32 rank)
508 auto p2p_message =
buildMessage(rank, Parallel::NonBlocking);
509 Request r = m_message_queue->addReceive(p2p_message, ReceiveBufferInfo(s));
510 m_message_queue->waitAll(ArrayView<Request>(1, &r));
519 return m_utils_factory->createReceiveSerializeMessage(
this, rank)._release();
528 ARCANE_UNUSED(requests);
540 return m_message_queue->probe(p2p_message);
551 return m_message_queue->legacyProbe(p2p_message);
561 return m_message_queue->addSend(p2p_message, s);
581 m_stat->print(m_trace);
590 m_thread_barrier->wait();
592 m_mpi_parallel_mng->barrier();
593 m_thread_barrier->wait();
600_createSerializeMessageList()
603 x->setAllowAnyRankReceive(
false);
613 return m_utils_factory->createSynchronizer(
this, family)._release();
622 return m_utils_factory->createSynchronizer(
this, group)._release();
631 return m_utils_factory->createTopology(
this)._release();
640 return m_replication;
649 delete m_replication;
659 return m_sequential_parallel_mng.get();
666sequentialParallelMngRef()
668 return m_sequential_parallel_mng;
684 RequestList(HybridParallelMng* pm)
686 , m_message_queue(pm->m_message_queue)
687 , m_local_rank(m_parallel_mng->localRank())
695 case Parallel::WaitAll:
696 m_parallel_mng->m_message_queue->waitAll(_requests());
699 m_message_queue->waitSome(m_local_rank, _requests(), _requestsDone(),
false);
702 m_message_queue->waitSome(m_local_rank, _requests(), _requestsDone(),
true);
708 HybridParallelMng* m_parallel_mng;
729 m_message_queue->waitAll(requests);
738 return m_mpi_parallel_mng->getMPICommunicator();
747 return m_mpi_parallel_mng->communicator();
756 return m_mpi_parallel_mng->machineCommunicator();
782IParallelMng* HybridParallelMng::
785 ARCANE_UNUSED(kept_ranks);
786 ARCANE_THROW(NotSupportedException,
"Use createSubParallelMngRef() instead");
798 if (kept_ranks.
empty())
802 m_trace->info() <<
"CREATE SUB_PARALLEL_MNG_REF";
822 Int32 my_new_global_rank = (-1);
823 Int32 new_local_nb_rank = 0;
824 Int32 my_new_local_rank = (-1);
825 for (
Integer i = 0; i < nb_kept_rank; ++i) {
826 Int32 kept_rank = kept_ranks[i];
827 if (kept_rank >= first_global_rank_in_this_mpi && kept_rank < last_global_rank_in_this_mpi)
830 my_new_global_rank = i;
831 my_new_local_rank = new_local_nb_rank - 1;
834 bool has_new_rank = (my_new_global_rank != (-1));
845 Int32 min_new_local_nb_rank = -1;
846 Int32 max_new_local_nb_rank = -1;
847 Int32 sum_new_local_nb_rank = -1;
848 Int32 min_rank = A_NULL_RANK;
849 Int32 max_rank = A_NULL_RANK;
850 computeMinMaxSum(new_local_nb_rank, min_new_local_nb_rank, max_new_local_nb_rank,
851 sum_new_local_nb_rank, min_rank, max_rank);
853 m_trace->info() <<
"CREATE SUB_PARALLEL_MNG_REF new_local_nb_rank=" << new_local_nb_rank
854 <<
" min=" << min_new_local_nb_rank
855 <<
" max=" << max_new_local_nb_rank
856 <<
" sum=" << sum_new_local_nb_rank
857 <<
" new_global_rank=" << my_new_global_rank;
861 if (max_new_local_nb_rank == 1) {
862 Integer nb_mpi_rank = m_mpi_parallel_mng->commSize();
872 bool do_mpi_call =
false;
873 if (min_new_local_nb_rank == 1) {
876 kept_mpi_ranks.
resize(nb_mpi_rank);
877 for (
Int32 x = 0; x < nb_mpi_rank; ++x)
878 kept_mpi_ranks[x] = x;
887 Int16 v = (has_new_rank) ? 1 : 0;
890 for (
Int32 x = 0; x < nb_mpi_rank; ++x)
891 if (gathered_ranks[x] == 1)
892 kept_mpi_ranks.
add(x);
895 return m_mpi_parallel_mng->createSubParallelMngRef(kept_mpi_ranks);
900 if (max_new_local_nb_rank != new_local_nb_rank)
901 ARCANE_FATAL(
"Not same number of new local ranks on every MPI processus: current={0} max={1}",
902 new_local_nb_rank, max_new_local_nb_rank);
904 if (max_new_local_nb_rank < 2)
905 ARCANE_FATAL(
"number of local ranks is too low current={0} minimum=2", new_local_nb_rank);
908 m_thread_barrier->wait();
921 builder = m_sub_builder_factory->_createParallelMngBuilder(new_local_nb_rank, c, mc);
923 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder = builder;
926 m_thread_barrier->wait();
928 builder = m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder;
932 if (my_new_local_rank >= 0) {
933 new_parallel_mng = builder->_createParallelMng(my_new_local_rank,
traceMng());
935 m_thread_barrier->wait();
941 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder.reset();
943 m_thread_barrier->wait();
945 return new_parallel_mng;
954 return m_utils_factory;
960bool HybridParallelMng::
961_isAcceleratorAware()
const
#define ARCANE_CHECK_POINTER(ptr)
Macro returning the pointer ptr if it is not null or throwing an exception if it is null.
#define ARCANE_THROW(exception_class,...)
Macro for throwing an exception with formatting.
#define ARCANE_FATAL(...)
Macro throwing a FatalErrorException.
Brief list of message exchange functions.
Exception when an argument is invalid.
Modifiable view of an array of type T.
void resize(Int64 s)
Changes the number of elements in the array to s.
void add(ConstReferenceType val)
Adds element val to the end of the array.
Constant view of an array of type T.
constexpr Integer size() const noexcept
Number of elements in the array.
constexpr bool empty() const noexcept
true if the array is empty (size()==0)
Operations to access variable values from another subdomain.
Interface of the input/output manager.
Interface of an entity family.
Information exchange between processors.
virtual bool isAcceleratorAware() const =0
Indicates if the implementation handles accelerators.
Interface of the parallelism manager for a subdomain.
virtual void computeMinMaxSum(char val, char &min_val, char &max_val, char &sum_val, Int32 &min_rank, Int32 &max_rank)=0
Calculates the sum, min, and max of a value in one operation.
Brief information on parallel subdomain replication.
Information on the computing core allocation topology.
virtual TraceMessage info()=0
Stream for an information message.
Sends values across different processors.
Interface of a variable synchronization service.
Options to configure allocations.
Interface for a message queue with threads.
void machineBarrier() override
Method allowing a barrier for the sub-domains of the computing node.
Int32 masterParallelIORank() const override
void initializeWindowCreator() override
Method allowing the initialization of the windowCreator specific to the implementation.
Int32 nbSendersToMasterParallelIO() const override
MemoryAllocationOptions machineShMemWinMemoryAllocator() override
Method allowing retrieval of a shared memory allocator.
bool isMachineShMemWinAvailable() override
Method allowing to know if shared memory mode is supported.
ConstArrayView< Int32 > machineRanks() override
Method allowing retrieval of the ranks of the sub-domains of the computing node.
Ref< IContigMachineShMemWinBaseInternal > createContigMachineShMemWinBase(Int64 sizeof_segment, Int32 sizeof_type) override
Method allowing the creation of a memory window on the node.
Ref< IMachineShMemWinBaseInternal > createMachineShMemWinBase(Int64 sizeof_segment, Int32 sizeof_type) override
Method allowing the creation of a dynamic memory window on the node.
Implementation of IRequestList for HybridParallelMng.
void _wait(Parallel::eWaitType wait_type) override
Performs the wait or test.
Thread-based parallelism manager.
IParallelMng * worldParallelMng() const override
Parallelism manager over all allocated resources.
void freeRequests(ArrayView< Request > requests) override
Frees the requests.
void printStats() override
Prints statistics related to this parallelism manager.
IParallelExchanger * createExchanger() override
Returns an interface for transferring messages between processors.
void barrier() override
Performs a barrier.
MP::Communicator communicator() const override
MPI communicator associated with this manager.
bool m_is_initialized
true if already initialized
ISerializeMessage * createReceiveSerializer(Int32 rank) override
Creates a non-blocking message to receive serialized data from rank rank.
Ref< Parallel::IRequestList > createRequestListRef() override
Creates a request list for this manager.
ITimerMng * timerMng() const override
Timer manager.
void build() override
Constructs the instance.
void waitAllRequests(ArrayView< Request > requests) override
Blocks while waiting for the rvalues requests to complete.
void initialize() override
Initializes the parallelism manager.
IThreadMng * threadMng() const override
Thread manager.
Int32 m_global_nb_rank
Total number of global ranks.
MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message) override
Probes if messages are available.
Int32 m_local_rank
Local rank of the current processor.
ITraceMng * traceMng() const override
Trace manager.
IGetVariablesValuesParallelOperation * createGetVariablesValuesOperation() override
Returns an operation to retrieve the values of a variable on the entities of another subdomain.
void * getMPICommunicator() override
Address of the MPI communicator associated with this manager.
IParallelReplication * replication() const override
Replication information.
Ref< IParallelMngUtilsFactory > _internalUtilsFactory() const override
Factory for utility functions.
Ref< IParallelMng > createSubParallelMngRef(Int32ConstArrayView kept_ranks) override
Creates a new parallelism manager for a subset of ranks.
MP::Communicator machineCommunicator() const override
MPI communicator derived from the communicator communicator() gathering all processes of the compute ...
ITransferValuesParallelOperation * createTransferValuesOperation() override
Returns an operation to transfer values between subdomains.
IParallelTopology * createTopology() override
Creates an instance containing information about the rank topology of this manager.
void setReplication(IParallelReplication *v) override
Sets the Replication Information.
IParallelMng * sequentialParallelMng() override
Returns a sequential parallelism manager.
IVariableSynchronizer * createSynchronizer(IItemFamily *family) override
Returns an interface for synchronizing variables on the group of the family.
Int32 m_local_nb_rank
Number of local ranks.
MessageId probe(const PointToPointMessageInfo &message) override
Probes if messages are available.
ISerializeMessage * createSendSerializer(Int32 rank) override
Creates a non-blocking message to send serialized data to rank rank.
Int32 commRank() const override
Rank of this instance in the communicator.
Int32 m_global_rank
Current processor number.
PointToPointMessageInfo buildMessage(Int32 dest, MP::eBlockingType is_blocking)
Constructs a message with destination dest.
void processPendingMessages() override
Sends the messages in the list that have not yet been sent.
Integer waitMessages(Parallel::eWaitType wait_type) override
Waits until the messages have finished execution.
void addMessage(ISerializeMessage *msg) override
Adds a message to the list.
Interface of the message passing manager.
Interface for a serialization message list.
Interface for a serialization message between IMessagePassingMng.
Information about the source of a message.
Information for sending/receiving a point-to-point message.
void setEmiterRank(MessageRank rank)
Positions the message sender rank.
Receive buffer information.
Base class of a request list.
IParallelMngInternal * _internalApi() override
Internal Arcane API.
Exception when a function is not implemented.
Redirects the message management of sub-domains according to the argument type.
IMessagePassingMng * messagePassingMng() const override
Associated Arccore message passing manager.
ITimeStats * timeStats() const override
Associated statistics manager (can be null).
Base class of a factory for IParallelMng utility functions.
Brief information on parallel subdomain replication.
InstanceType * get() const
Associated instance or nullptr if none.
Reference to an instance.
Implementation of a buffer for serialization.
View of an array of elements of type T.
Positions the phase of the currently executing action.
1D data vector with value semantics (STL style).
Declarations of types and methods used by message exchange mechanisms.
C void mpBroadcast(IMessagePassingMng *pm, Span< char > send_buf, Int32 rank)
@ WaitSome
Wait until all messages in the list are processed.
eBlockingType
Type indicating whether a message is blocking or not.
Concurrency implementation.
-- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature --
std::int8_t Int8
Signed integer type of 8 bits.
ArrayView< Int64 > Int64ArrayView
C equivalent of a 1D array of 64-bit integers.
Ref< TrueType > createRef(Args &&... args)
Creates an instance of type TrueType with arguments Args and returns a reference to it.
std::int64_t Int64
Signed integer type of 64 bits.
Int32 Integer
Type representing an integer.
Array< Byte > ByteArray
Dynamic one-dimensional array of characters.
ConstArrayView< Int32 > Int32ConstArrayView
C equivalent of a 1D array of 32-bit integers.
std::int16_t Int16
Signed integer type of 16 bits.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Creates a reference on a pointer.
std::int32_t Int32
Signed integer type of 32 bits.
Info for constructing a HybridParallelMng.
Information to construct a SequentialParallelMng.