14#include "arcane/parallel/thread/SharedMemoryParallelMng.h"
16#include "arcane/utils/NotImplementedException.h"
17#include "arcane/utils/NotSupportedException.h"
18#include "arcane/utils/PlatformUtils.h"
19#include "arcane/utils/TraceInfo.h"
20#include "arcane/utils/NumericTypes.h"
21#include "arcane/utils/ArgumentException.h"
22#include "arcane/utils/FatalErrorException.h"
23#include "arcane/utils/ITraceMng.h"
25#include "arcane/parallel/IStat.h"
27#include "arcane/parallel/thread/SharedMemoryParallelDispatch.h"
28#include "arcane/parallel/thread/ISharedMemoryMessageQueue.h"
29#include "arcane/parallel/thread/internal/SharedMemoryMachineShMemWinBaseInternalCreator.h"
30#include "arcane/parallel/thread/internal/SharedMemoryContigMachineShMemWinBaseInternal.h"
31#include "arcane/parallel/thread/internal/SharedMemoryMachineShMemWinBaseInternal.h"
33#include "arcane/core/Timer.h"
34#include "arcane/core/IIOMng.h"
35#include "arcane/core/ISerializeMessageList.h"
36#include "arcane/core/IItemFamily.h"
37#include "arcane/core/internal/SerializeMessage.h"
38#include "arcane/core/internal/ParallelMngInternal.h"
39#include "arcane/core/internal/MachineShMemWinMemoryAllocator.h"
41#include "arcane/impl/TimerMng.h"
42#include "arcane/impl/ParallelReplication.h"
43#include "arcane/impl/internal/ParallelMngUtilsFactoryBase.h"
45#include "arccore/message_passing/RequestListBase.h"
46#include "arccore/message_passing/internal/SerializeMessageList.h"
55extern "C++" ARCANE_IMPL_EXPORT
IIOMng*
75 explicit RequestList(SharedMemoryParallelMng* pm)
77 , m_message_queue(pm->m_message_queue)
78 , m_local_rank(m_parallel_mng->commRank())
86 case Parallel::WaitAll:
87 return m_message_queue->waitAll(_requests());
89 return m_message_queue->waitSome(m_local_rank, _requests(), _requestsDone(),
false);
91 return m_message_queue->waitSome(m_local_rank, _requests(), _requestsDone(),
true);
97 SharedMemoryParallelMng* m_parallel_mng;
106:
public ParallelMngInternal
111 : ParallelMngInternal(pm)
113 , m_window_creator(window_creator)
117 ~Impl()
override =
default;
133 return makeRef(m_window_creator->createWindow(m_parallel_mng->commRank(), sizeof_segment, sizeof_type));
138 return makeRef(m_window_creator->createDynamicWindow(m_parallel_mng->commRank(), sizeof_segment, sizeof_type));
148 return m_window_creator->machineRanks();
153 m_window_creator->machineBarrier();
158 SharedMemoryParallelMng* m_parallel_mng;
166SharedMemoryParallelMng::
169, m_trace(build_info.trace_mng)
170, m_thread_mng(build_info.thread_mng)
171, m_sequential_parallel_mng(build_info.sequential_parallel_mng)
172, m_timer_mng(nullptr)
174, m_world_parallel_mng(build_info.world_parallel_mng)
176, m_message_queue(build_info.message_queue)
177, m_is_parallel(build_info.nb_rank != 1)
178, m_rank(build_info.rank)
179, m_nb_rank(build_info.nb_rank)
180, m_is_initialized(false)
181, m_stat(
Parallel::createDefaultStat())
182, m_thread_barrier(build_info.thread_barrier)
183, m_all_dispatchers(build_info.all_dispatchers)
184, m_sub_builder_factory(build_info.sub_builder_factory)
185, m_parent_container_ref(build_info.container)
186, m_mpi_communicator(build_info.communicator)
188, m_parallel_mng_internal(new Impl(this, build_info.window_creator))
190 if (!m_world_parallel_mng)
191 m_world_parallel_mng =
this;
197SharedMemoryParallelMng::
198~SharedMemoryParallelMng()
200 delete m_parallel_mng_internal;
201 delete m_replication;
202 m_sequential_parallel_mng.reset();
214 class DispatchCreator
218 DispatchCreator(ITraceMng* tm, SharedMemoryParallelMng* mpm,
219 ISharedMemoryMessageQueue* message_queue,
220 SharedMemoryAllDispatcher* all_dispatchers)
223 , m_message_queue(message_queue)
224 , m_all_dispatchers(all_dispatchers)
229 template <
typename DataType> SharedMemoryParallelDispatch<DataType>*
232 ISharedMemoryMessageQueue* tmq = m_message_queue;
233 SharedMemoryAllDispatcher* ad = m_all_dispatchers;
234 auto& field = ad->instance((DataType*)
nullptr);
235 return new SharedMemoryParallelDispatch<DataType>(m_tm, m_mpm, tmq, field);
239 SharedMemoryParallelMng* m_mpm;
240 ISharedMemoryMessageQueue* m_message_queue;
241 SharedMemoryAllDispatcher* m_all_dispatchers;
254 DispatchCreator creator(m_trace.get(),
this, m_message_queue, m_all_dispatchers);
255 this->createDispatchers(creator);
257 m_io_mng = arcaneCreateIOMng(
this);
268 m_trace->warning() <<
"SharedMemoryParallelMng already initialized";
281 return m_utils_factory->createGetVariablesValuesOperation(
this)._release();
287 return m_utils_factory->createTransferValuesOperation(
this)._release();
293 return m_utils_factory->createExchanger(
this)._release();
302void SharedMemoryParallelMng::
305 auto p2p_message =
buildMessage(dest_rank, Parallel::Blocking);
316 ARCANE_UNUSED(bytes);
326 return m_utils_factory->createSendSerializeMessage(
this, rank)._release();
332void SharedMemoryParallelMng::
344 m_message_queue->waitAll(requests);
347 recvSerializer(values, rank);
354void SharedMemoryParallelMng::
355recvSerializer(ISerializer* values,
Int32 rank)
357 auto p2p_message =
buildMessage(rank, Parallel::Blocking);
358 Request r = m_message_queue->addReceive(p2p_message, ReceiveBufferInfo(values));
359 m_message_queue->waitAll(ArrayView<Request>(1, &r));
368 return m_utils_factory->createReceiveSerializeMessage(
this, rank)._release();
377 ARCANE_UNUSED(requests);
388 m_stat->print(m_trace.get());
397 m_thread_barrier->wait();
407 m_message_queue->waitAll(requests);
409 m_stat->add(
"WaitAll", (end_time - begin_time), 0);
416_createSerializeMessageList()
429 return m_message_queue->probe(p2p_message);
440 return m_message_queue->legacyProbe(p2p_message);
446auto SharedMemoryParallelMng::
450 return m_message_queue->addSend(p2p_message,
SendBufferInfo(values));
456auto SharedMemoryParallelMng::
469 return m_utils_factory->createSynchronizer(
this, family)._release();
478 return m_utils_factory->createSynchronizer(
this, group)._release();
487 return m_utils_factory->createTopology(
this)._release();
496 return m_replication;
505 delete m_replication;
515 ARCANE_UNUSED(kept_ranks);
529 if (kept_ranks.
empty())
538 Int32 my_new_rank = (-1);
539 for (
Integer i = 0; i < nb_rank; ++i)
540 if (kept_ranks[i] ==
m_rank) {
548 builder = m_sub_builder_factory->_createParallelMngBuilder(nb_rank, m_mpi_communicator, m_mpi_communicator);
550 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder = builder;
554 builder = m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder;
558 if (my_new_rank >= 0) {
559 new_parallel_mng = builder->_createParallelMng(my_new_rank,
traceMng());
570 m_all_dispatchers->m_create_sub_parallel_mng_info.m_builder.reset();
574 return new_parallel_mng;
591sequentialParallelMngRef()
593 return m_sequential_parallel_mng;
599 return m_sequential_parallel_mng.get();
619 return buildMessage({ MessageRank(dest), blocking_mode });
628 return m_utils_factory;
#define ARCANE_CHECK_POINTER(ptr)
Macro returning the pointer ptr if it is not null or throwing an exception if it is null.
#define ARCANE_THROW(exception_class,...)
Macro for throwing an exception with formatting.
#define ARCANE_FATAL(...)
Macro throwing a FatalErrorException.
Modifiable view of an array of type T.
void add(ConstReferenceType val)
Adds element val to the end of the array.
Constant view of an array of type T.
constexpr Integer size() const noexcept
Number of elements in the array.
constexpr bool empty() const noexcept
true if the array is empty (size()==0)
Operations to access variable values from another subdomain.
Interface of the input/output manager.
Interface of an entity family.
Information exchange between processors.
Interface of the parallelism manager for a subdomain.
Brief information on parallel subdomain replication.
Information on the computing core allocation topology.
Sends values across different processors.
Interface of a variable synchronization service.
Options to configure allocations.
Interface for a serialization message list.
Interface for a serialization message between IMessagePassingMng.
Interface of a message queue with threads.
Information about the source of a message.
Information for sending/receiving a point-to-point message.
void setEmiterRank(MessageRank rank)
Positions the message sender rank.
Receive buffer information.
Int32 masterParallelIORank() const override
bool isMachineShMemWinAvailable() override
Method allowing to know if shared memory mode is supported.
MemoryAllocationOptions machineShMemWinMemoryAllocator() override
Method allowing retrieval of a shared memory allocator.
void initializeWindowCreator() override
Method allowing the initialization of the windowCreator specific to the implementation.
Ref< IMachineShMemWinBaseInternal > createMachineShMemWinBase(Int64 sizeof_segment, Int32 sizeof_type) override
Method allowing the creation of a dynamic memory window on the node.
ConstArrayView< Int32 > machineRanks() override
Method allowing retrieval of the ranks of the sub-domains of the computing node.
Int32 nbSendersToMasterParallelIO() const override
Ref< IContigMachineShMemWinBaseInternal > createContigMachineShMemWinBase(Int64 sizeof_segment, Int32 sizeof_type) override
Method allowing the creation of a memory window on the node.
void machineBarrier() override
Method allowing a barrier for the sub-domains of the computing node.
Implementation of IRequestList for SharedMemoryParallelMng.
void _wait(Parallel::eWaitType wait_type) override
Performs the wait or test.
IParallelTopology * createTopology() override
Creates an instance containing information about the rank topology of this manager.
IGetVariablesValuesParallelOperation * createGetVariablesValuesOperation() override
Returns an operation to retrieve the values of a variable on the entities of another subdomain.
Ref< Parallel::IRequestList > createRequestListRef() override
Creates a request list for this manager.
void build() override
Constructs the instance.
Int32 m_nb_rank
Number of ranks.
bool m_is_initialized
true if already initialized
void printStats() override
Prints statistics related to this parallelism manager.
void initialize() override
Initializes the parallelism manager.
void barrier() override
Performs a barrier.
void setReplication(IParallelReplication *v) override
Sets the Replication Information.
PointToPointMessageInfo buildMessage(Int32 dest, MP::eBlockingType is_blocking)
Constructs a message with destination dest.
void waitAllRequests(ArrayView< Request > requests) override
Blocks while waiting for the rvalues requests to complete.
Ref< IParallelMngUtilsFactory > _internalUtilsFactory() const override
Factory for utility functions.
IVariableSynchronizer * createSynchronizer(IItemFamily *family) override
Returns an interface for synchronizing variables on the group of the family.
MessageId probe(const PointToPointMessageInfo &message) override
Probes if messages are available.
ISerializeMessage * createSendSerializer(Int32 rank) override
Creates a non-blocking message to send serialized data to rank rank.
ITraceMng * traceMng() const override
Trace manager.
IParallelMng * sequentialParallelMng() override
Returns a sequential parallelism manager.
Int32 m_rank
Rank of the instance.
IParallelExchanger * createExchanger() override
Returns an interface for transferring messages between processors.
void freeRequests(ArrayView< Parallel::Request > requests) override
Frees the requests.
ISerializeMessage * createReceiveSerializer(Int32 rank) override
Creates a non-blocking message to receive serialized data from rank rank.
Ref< IParallelMng > createSubParallelMngRef(Int32ConstArrayView kept_ranks) override
Creates a new parallelism manager for a subset of ranks.
ITransferValuesParallelOperation * createTransferValuesOperation() override
Returns an operation to transfer values between subdomains.
MessageSourceInfo legacyProbe(const PointToPointMessageInfo &message) override
Probes if messages are available.
IParallelReplication * replication() const override
Replication information.
Base class of a request list.
Exception when a function is not implemented.
Exception when an operation is not supported.
Redirects the message management of sub-domains according to the argument type.
IMessagePassingMng * messagePassingMng() const override
Associated Arccore message passing manager.
Base class of a factory for IParallelMng utility functions.
Brief information on parallel subdomain replication.
InstanceType * get() const
Associated instance or nullptr if none.
Reference to an instance.
1D data vector with value semantics (STL style).
Declarations of types and methods used by message exchange mechanisms.
@ WaitSome
Wait until all messages in the list are processed.
eBlockingType
Type indicating whether a message is blocking or not.
Concurrency implementation.
-- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature --
Ref< TrueType > createRef(Args &&... args)
Creates an instance of type TrueType with arguments Args and returns a reference to it.
std::int64_t Int64
Signed integer type of 64 bits.
Int32 Integer
Type representing an integer.
Array< Byte > ByteArray
Dynamic one-dimensional array of characters.
ConstArrayView< Int32 > Int32ConstArrayView
C equivalent of a 1D array of 32-bit integers.
double Real
Type representing a real number.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Creates a reference on a pointer.
std::int32_t Int32
Signed integer type of 32 bits.
Info to construct a SharedMemoryParallelMng.